From 660d5210d6e804ae4ca2018f0db8fb12c7486ebf Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Thu, 23 May 2024 19:58:16 +0900 Subject: [PATCH] Add snapshot-disable-gc flag --- cmd/yorkie/server.go | 6 +++ pkg/document/document.go | 31 +++++++++++++++- pkg/document/internal_document.go | 4 +- server/backend/config.go | 3 ++ server/config.go | 1 + server/packs/packs.go | 2 +- server/packs/pushpull.go | 2 +- server/packs/snapshots.go | 2 +- test/integration/gc_test.go | 62 +++++++++++++++++++++++++++++++ 9 files changed, 106 insertions(+), 7 deletions(-) diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index 2336bab51..cca3b91d5 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -291,6 +291,12 @@ func init() { server.DefaultSnapshotWithPurgingChanges, "Whether to delete previous changes when the snapshot is created.", ) + cmd.Flags().BoolVar( + &conf.Backend.SnapshotDisableGC, + "backend-snapshot-disable-gc", + server.DefaultSnapshotDisableGC, + "Whether to disable garbage collection of snapshots.", + ) cmd.Flags().Uint64Var( &conf.Backend.AuthWebhookMaxRetries, "auth-webhook-max-retries", diff --git a/pkg/document/document.go b/pkg/document/document.go index 347b25507..098c7629b 100644 --- a/pkg/document/document.go +++ b/pkg/document/document.go @@ -65,6 +65,22 @@ type BroadcastRequest struct { Payload []byte } +// Option configures Options. +type Option func(*Options) + +// Options configures how we set up the document. +type Options struct { + // DisableGC disables garbage collection. + DisableGC bool +} + +// WithDisableGC configures the document to disable garbage collection. +func WithDisableGC() Option { + return func(o *Options) { + o.DisableGC = true + } +} + // Document represents a document accessible to the user. // // How document works: @@ -76,6 +92,9 @@ type Document struct { // doc is the original data of the actual document. doc *InternalDocument + // options is the options to configure the document. + options Options + // cloneRoot is a copy of `doc.root` to be exposed to the user and is used to // protect `doc.root`. cloneRoot *crdt.Root @@ -100,9 +119,15 @@ type Document struct { } // New creates a new instance of Document. -func New(key key.Key) *Document { +func New(key key.Key, opts ...Option) *Document { + var options Options + for _, opt := range opts { + opt(&options) + } + return &Document{ doc: NewInternalDocument(key), + options: options, events: make(chan DocEvent, 1), broadcastRequests: make(chan BroadcastRequest, 1), broadcastResponses: make(chan error, 1), @@ -197,7 +222,9 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error { d.doc.checkpoint = d.doc.checkpoint.Forward(pack.Checkpoint) // 04. Do Garbage collection. - d.GarbageCollect(pack.MinSyncedTicket) + if !d.options.DisableGC { + d.GarbageCollect(pack.MinSyncedTicket) + } // 05. Update the status. if pack.IsRemoved { diff --git a/pkg/document/internal_document.go b/pkg/document/internal_document.go index 4f1cdd1ad..40697b809 100644 --- a/pkg/document/internal_document.go +++ b/pkg/document/internal_document.go @@ -142,7 +142,7 @@ func (d *InternalDocument) HasLocalChanges() bool { } // ApplyChangePack applies the given change pack into this document. -func (d *InternalDocument) ApplyChangePack(pack *change.Pack) error { +func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) error { // 01. Apply remote changes to both the cloneRoot and the document. if len(pack.Snapshot) > 0 { if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq); err != nil { @@ -166,7 +166,7 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack) error { // 03. Update the checkpoint. d.checkpoint = d.checkpoint.Forward(pack.Checkpoint) - if pack.MinSyncedTicket != nil { + if !disableGC && pack.MinSyncedTicket != nil { if _, err := d.GarbageCollect(pack.MinSyncedTicket); err != nil { return err } diff --git a/server/backend/config.go b/server/backend/config.go index 1201b4faf..38d87b2d3 100644 --- a/server/backend/config.go +++ b/server/backend/config.go @@ -55,6 +55,9 @@ type Config struct { // SnapshotWithPurgingChanges is whether to delete previous changes when the snapshot is created. SnapshotWithPurgingChanges bool `yaml:"SnapshotWithPurgingChages"` + // SnapshotDisableGC is whether to disable garbage collection of snapshots. + SnapshotDisableGC bool + // AuthWebhookMaxRetries is the max count that retries the authorization webhook. AuthWebhookMaxRetries uint64 `yaml:"AuthWebhookMaxRetries"` diff --git a/server/config.go b/server/config.go index 2fcc9b3ca..eea4f3a25 100644 --- a/server/config.go +++ b/server/config.go @@ -58,6 +58,7 @@ const ( DefaultSnapshotThreshold = 500 DefaultSnapshotInterval = 1000 DefaultSnapshotWithPurgingChanges = false + DefaultSnapshotDisableGC = false DefaultAuthWebhookMaxRetries = 10 DefaultAuthWebhookMaxWaitInterval = 3000 * time.Millisecond diff --git a/server/packs/packs.go b/server/packs/packs.go index 58d952191..c5d076f68 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -236,7 +236,7 @@ func BuildDocumentForServerSeq( change.InitialCheckpoint.NextServerSeq(serverSeq), changes, nil, - )); err != nil { + ), be.Config.SnapshotDisableGC); err != nil { return nil, err } diff --git a/server/packs/pushpull.go b/server/packs/pushpull.go index ca87c87ae..20303689a 100644 --- a/server/packs/pushpull.go +++ b/server/packs/pushpull.go @@ -151,7 +151,7 @@ func pullSnapshot( doc.Checkpoint().NextServerSeq(docInfo.ServerSeq), reqPack.Changes, nil, - )); err != nil { + ), be.Config.SnapshotDisableGC); err != nil { return nil, err } } diff --git a/server/packs/snapshots.go b/server/packs/snapshots.go index 39b0c915d..f46aecd4b 100644 --- a/server/packs/snapshots.go +++ b/server/packs/snapshots.go @@ -91,7 +91,7 @@ func storeSnapshot( ) pack.MinSyncedTicket = minSyncedTicket - if err := doc.ApplyChangePack(pack); err != nil { + if err := doc.ApplyChangePack(pack, be.Config.SnapshotDisableGC); err != nil { return err } diff --git a/test/integration/gc_test.go b/test/integration/gc_test.go index 9f5bae462..0713abd4f 100644 --- a/test/integration/gc_test.go +++ b/test/integration/gc_test.go @@ -20,14 +20,17 @@ package integration import ( "context" + "strconv" "testing" "github.com/stretchr/testify/assert" + "github.com/yorkie-team/yorkie/client" "github.com/yorkie-team/yorkie/pkg/document" "github.com/yorkie-team/yorkie/pkg/document/json" "github.com/yorkie-team/yorkie/pkg/document/presence" "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/server" "github.com/yorkie-team/yorkie/test/helper" ) @@ -602,4 +605,63 @@ func TestGarbageCollection(t *testing.T) { assert.Equal(t, `ad`, d2.Root().GetTree("tree").ToXML()) assert.Equal(t, 0, d1.GarbageLen()) }) + + t.Run("Should not collect the garbage if the DisableGC is true", func(t *testing.T) { + // 01. Create a new server with SnapshotDisableGC set to true + conf := helper.TestConfig() + conf.Backend.SnapshotDisableGC = true + conf.Backend.SnapshotThreshold = 10 + conf.Backend.SnapshotInterval = 10 + testServer, err := server.New(conf) + assert.NoError(t, err) + assert.NoError(t, testServer.Start()) + defer func() { + assert.NoError(t, testServer.Shutdown(true)) + }() + + ctx := context.Background() + c1, err := client.Dial(testServer.RPCAddr()) + assert.NoError(t, err) + assert.NoError(t, c1.Activate(ctx)) + defer func() { + assert.NoError(t, c1.Deactivate(ctx)) + assert.NoError(t, c1.Close()) + }() + + // 02. Create a document and update it to check if the garbage is collected + d1 := document.New(helper.TestDocKey(t), document.WithDisableGC()) + assert.NoError(t, c1.Attach(ctx, d1)) + defer func() { + assert.NoError(t, c1.Detach(ctx, d1)) + }() + + assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { + root.SetNewText("text").Edit(0, 0, "-") + return nil + })) + for i := 0; i < int(conf.Backend.SnapshotInterval); i++ { + assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { + root.GetText("text").Edit(0, 1, strconv.Itoa(i)) + return nil + })) + } + assert.Equal(t, int(conf.Backend.SnapshotInterval), d1.GarbageLen()) + assert.NoError(t, c1.Sync(ctx)) + + // 03. Check if the garbage is collected after the snapshot interval + c2, err := client.Dial(testServer.RPCAddr()) + assert.NoError(t, err) + assert.NoError(t, c2.Activate(ctx)) + defer func() { + assert.NoError(t, c2.Deactivate(ctx)) + assert.NoError(t, c2.Close()) + }() + + d2 := document.New(helper.TestDocKey(t), document.WithDisableGC()) + assert.NoError(t, c2.Attach(ctx, d2)) + defer func() { + assert.NoError(t, c2.Detach(ctx, d2)) + }() + assert.Equal(t, int(conf.Backend.SnapshotInterval), d2.GarbageLen()) + }) }