From 65179f1e4b612392b1426e0f7c0ba2b3ce09262e Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Tue, 10 Sep 2024 15:20:30 -0500 Subject: [PATCH 1/2] prune archived orders Prune archived orders when there are more than 1000 (configurable). --- client/app/config.go | 29 +++++--- client/core/core.go | 4 ++ client/db/bolt/db.go | 141 ++++++++++++++++++++++++++++++++++++++ client/db/bolt/db_test.go | 138 ++++++++++++++++++++++++++++++++++++- 4 files changed, 298 insertions(+), 14 deletions(-) diff --git a/client/app/config.go b/client/app/config.go index 53cd886cd1..7466c5a824 100644 --- a/client/app/config.go +++ b/client/app/config.go @@ -22,17 +22,18 @@ import ( ) const ( - defaultRPCCertFile = "rpc.cert" - defaultRPCKeyFile = "rpc.key" - defaultMainnetHost = "127.0.0.1" - defaultTestnetHost = "127.0.0.2" - defaultSimnetHost = "127.0.0.3" - walletPairOneHost = "127.0.0.6" - walletPairTwoHost = "127.0.0.7" - defaultRPCPort = "5757" - defaultWebPort = "5758" - defaultLogLevel = "debug" - configFilename = "dexc.conf" + defaultRPCCertFile = "rpc.cert" + defaultRPCKeyFile = "rpc.key" + defaultMainnetHost = "127.0.0.1" + defaultTestnetHost = "127.0.0.2" + defaultSimnetHost = "127.0.0.3" + walletPairOneHost = "127.0.0.6" + walletPairTwoHost = "127.0.0.7" + defaultRPCPort = "5757" + defaultWebPort = "5758" + defaultLogLevel = "debug" + configFilename = "dexc.conf" + defaultArchiveSizeLimit = 1000 ) var ( @@ -106,6 +107,8 @@ type CoreConfig struct { UnlockCoinsOnLogin bool `long:"release-wallet-coins" description:"On login or wallet creation, instruct the wallet to release any coins that it may have locked."` ExtensionModeFile string `long:"extension-mode-file" description:"path to a file that specifies options for running core as an extension."` + + ArchiveSizeLimit uint64 `long:"archivesize" description:"the maximum number of orders to be archived before deleting the oldest"` } // WebConfig encapsulates the configuration needed for the web server. @@ -213,6 +216,7 @@ func (cfg *Config) Core(log dex.Logger) *core.Config { NoAutoWalletLock: cfg.NoAutoWalletLock, NoAutoDBBackup: cfg.NoAutoDBBackup, ExtensionModeFile: cfg.ExtensionModeFile, + ArchiveSizeLimit: cfg.ArchiveSizeLimit, } } @@ -223,6 +227,9 @@ var DefaultConfig = Config{ RPCConfig: RPCConfig{ CertHosts: []string{defaultTestnetHost, defaultSimnetHost, defaultMainnetHost}, }, + CoreConfig: CoreConfig{ + ArchiveSizeLimit: defaultArchiveSizeLimit, + }, } // ParseCLIConfig parses the command-line arguments into the provided struct diff --git a/client/core/core.go b/client/core/core.go index b67d5c7ae3..5339080e15 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -1433,6 +1433,9 @@ type Config struct { // for running core in extension mode, which gives the caller options for // e.g. limiting the ability to configure wallets. ExtensionModeFile string + // ArchiveSizeLimit is the maximum number of orders that will be archived + // before we start deleting the oldest. + ArchiveSizeLimit uint64 } // locale is data associated with the currently selected language. @@ -1515,6 +1518,7 @@ func New(cfg *Config) (*Core, error) { } dbOpts := bolt.Opts{ BackupOnShutdown: !cfg.NoAutoDBBackup, + ArchiveSizeLimit: cfg.ArchiveSizeLimit, } boltDB, err := bolt.NewDB(cfg.DBPath, cfg.Logger.SubLogger("DB"), dbOpts) if err != nil { diff --git a/client/db/bolt/db.go b/client/db/bolt/db.go index 09130b986c..ea7a4d97b5 100644 --- a/client/db/bolt/db.go +++ b/client/db/bolt/db.go @@ -14,6 +14,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "decred.org/dcrdex/client/db" @@ -138,6 +139,7 @@ var ( // Opts is a set of options for the DB. type Opts struct { BackupOnShutdown bool // default is true + ArchiveSizeLimit uint64 } var defaultOpts = Opts{ @@ -234,8 +236,29 @@ func (db *BoltDB) fileSize(path string) int64 { // Run waits for context cancellation and closes the database. func (db *BoltDB) Run(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + tick := time.After(time.Minute) + for { + select { + case <-tick: + case <-ctx.Done(): + return + } + if err := db.pruneArchivedOrders(); err != nil { + db.log.Errorf("Error cleaning archive: %v", err) + } + tick = time.After(time.Minute * 30) + } + }() + <-ctx.Done() // wait for shutdown to backup and compact + // Wait for archive cleaner to exit. + wg.Wait() + // Create a backup in the backups folder. if db.opts.BackupOnShutdown { db.log.Infof("Backing up database...") @@ -408,6 +431,124 @@ func (db *BoltDB) SetPrimaryCredentials(creds *dexdb.PrimaryCredentials) error { }) } +func (db *BoltDB) pruneArchivedOrders() error { + var archiveSizeLimit uint64 = 1000 + if db.opts.ArchiveSizeLimit != 0 { + archiveSizeLimit = db.opts.ArchiveSizeLimit + } + + return db.Update(func(tx *bbolt.Tx) error { + archivedOB := tx.Bucket(archivedOrdersBucket) + if archivedOB == nil { + return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket)) + } + + // We won't delete any orders with active matches. + activeMatches := tx.Bucket(activeMatchesBucket) + if activeMatches == nil { + return fmt.Errorf("failed to open %s bucket", string(activeMatchesBucket)) + } + oidsWithActiveMatches := make(map[order.OrderID]struct{}, 0) + if err := activeMatches.ForEach(func(k, _ []byte) error { + mBkt := activeMatches.Bucket(k) + if mBkt == nil { + return fmt.Errorf("error getting match bucket %x", k) + } + var oid order.OrderID + copy(oid[:], mBkt.Get(orderIDKey)) + oidsWithActiveMatches[oid] = struct{}{} + return nil + }); err != nil { + return fmt.Errorf("error building active match order ID index: %w", err) + } + + nOrds := uint64(archivedOB.Stats().BucketN - 1 /* BucketN includes top bucket */) + if nOrds <= archiveSizeLimit { + return nil + } + + toClear := int(nOrds - archiveSizeLimit) + + type orderStamp struct { + oid []byte + stamp int64 + } + deletes := make([]*orderStamp, 0, toClear) + sortDeletes := func() { + sort.Slice(deletes, func(i, j int) bool { + return deletes[i].stamp < deletes[j].stamp + }) + } + if err := archivedOB.ForEach(func(oidB, v []byte) error { + var oid order.OrderID + copy(oid[:], oidB) + if _, found := oidsWithActiveMatches[oid]; found { + return nil + } + ord, err := decodeOrderBucket(oidB, archivedOB.Bucket(oidB)) + if err != nil { + return fmt.Errorf("error decoding order %x: %v", oid, err) + } + stamp := ord.Order.Prefix().ClientTime.Unix() + if len(deletes) < toClear { + deletes = append(deletes, &orderStamp{ + stamp: stamp, + oid: oidB, + }) + sortDeletes() + return nil + } + if stamp > deletes[len(deletes)-1].stamp { + return nil + } + deletes[len(deletes)-1] = &orderStamp{ + stamp: stamp, + oid: oidB, + } + sortDeletes() + return nil + }); err != nil { + return fmt.Errorf("archive iteration error: %v", err) + } + + deletedOrders := make(map[order.OrderID]struct{}) + for _, del := range deletes { + var oid order.OrderID + copy(oid[:], del.oid) + deletedOrders[oid] = struct{}{} + if err := archivedOB.DeleteBucket(del.oid); err != nil { + return fmt.Errorf("error deleting archived order %q: %v", del.oid, err) + } + } + + matchesToDelete := make([][]byte, 0, archiveSizeLimit /* just avoid some allocs if we can */) + archivedMatches := tx.Bucket(archivedMatchesBucket) + if archivedMatches == nil { + return errors.New("no archived match bucket") + } + if err := archivedMatches.ForEach(func(k, _ []byte) error { + matchBkt := archivedMatches.Bucket(k) + if matchBkt == nil { + return fmt.Errorf("no bucket found for %x during iteration", k) + } + var oid order.OrderID + copy(oid[:], matchBkt.Get(orderIDKey)) + if _, found := deletedOrders[oid]; found { + matchesToDelete = append(matchesToDelete, k) + } + return nil + }); err != nil { + return fmt.Errorf("error finding matches to prune: %w", err) + } + for i := range matchesToDelete { + if err := archivedMatches.DeleteBucket(matchesToDelete[i]); err != nil { + return fmt.Errorf("error deleting pruned match %x: %w", matchesToDelete[i], err) + } + } + return nil + }) +} + // validateCreds checks that the PrimaryCredentials fields are properly // populated. func validateCreds(creds *dexdb.PrimaryCredentials) error { diff --git a/client/db/bolt/db_test.go b/client/db/bolt/db_test.go index f34d096fd2..dc26b9a557 100644 --- a/client/db/bolt/db_test.go +++ b/client/db/bolt/db_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "flag" "fmt" "math/rand" "os" @@ -21,13 +22,14 @@ import ( ) var ( - tLogger = dex.StdOutLogger("db_TEST", dex.LevelTrace) + tLogger = dex.StdOutLogger("db_TEST", dex.LevelTrace) + withLongTests bool ) -func newTestDB(t *testing.T) (*BoltDB, func()) { +func newTestDB(t *testing.T, opts ...Opts) (*BoltDB, func()) { t.Helper() dbPath := filepath.Join(t.TempDir(), "db.db") - dbi, err := NewDB(dbPath, tLogger) + dbi, err := NewDB(dbPath, tLogger, opts...) if err != nil { t.Fatalf("error creating dB: %v", err) } @@ -50,6 +52,9 @@ func newTestDB(t *testing.T) (*BoltDB, func()) { } func TestMain(m *testing.M) { + flag.BoolVar(&withLongTests, "withlongtests", false, "include tests that take a long time to run") + flag.Parse() + defer os.Stdout.Sync() os.Exit(m.Run()) } @@ -1148,6 +1153,10 @@ func testCredentialsUpdate(t *testing.T, boltdb *BoltDB, tester func([]byte, str } func TestDeleteInactiveMatches(t *testing.T) { + // TODO: This test takes way too long to run. Why? + if !withLongTests { + return + } boltdb, shutdown := newTestDB(t) defer shutdown() @@ -1326,6 +1335,10 @@ func TestDeleteInactiveMatches(t *testing.T) { } func TestDeleteInactiveOrders(t *testing.T) { + // TODO: This test takes way too long to run. Why? + if !withLongTests { + return + } boltdb, shutdown := newTestDB(t) defer shutdown() @@ -1599,3 +1612,122 @@ func TestPokes(t *testing.T) { t.Fatal("Result from second LoadPokes wasn't empty") } } + +func TestPruneArchivedOrders(t *testing.T) { + const host = "blah" + const archiveSizeLimit = 5 + boltdb, shutdown := newTestDB(t, Opts{ArchiveSizeLimit: archiveSizeLimit}) + defer shutdown() + + archivedOrdersN := func() (n int) { + boltdb.View(func(tx *bbolt.Tx) error { + n = tx.Bucket(archivedOrdersBucket).Stats().BucketN - 1 /* BucketN includes top bucket */ + return nil + }) + return n + } + + var ordStampI int64 + addOrder := func(optStamp int64) order.OrderID { + ord, _ := ordertest.RandomLimitOrder() + if optStamp != 0 { + ord.P.ClientTime = time.Unix(optStamp, 0) + } else { + ord.P.ClientTime = time.Unix(ordStampI, 0) + ordStampI++ + } + boltdb.UpdateOrder(&db.MetaOrder{ + MetaData: &db.OrderMetaData{ + Status: order.OrderStatusExecuted, + Host: host, + Proof: db.OrderProof{DEXSig: []byte{0xa}}, + }, + Order: ord, + }) + return ord.ID() + } + for i := 0; i < archiveSizeLimit*2; i++ { + addOrder(0) + } + + if n := archivedOrdersN(); n != archiveSizeLimit*2 { + t.Fatalf("Expected %d archived orders after intitialization, saw %d", archiveSizeLimit*2, n) + } + + if err := boltdb.pruneArchivedOrders(); err != nil { + t.Fatalf("pruneArchivedOrders error: %v", err) + } + + if n := archivedOrdersN(); n != archiveSizeLimit { + t.Fatalf("Expected %d archived orders after first pruning, saw %d", archiveSizeLimit, n) + } + + // Make sure we pruned the first 5. + if err := boltdb.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(archivedOrdersBucket) + return bkt.ForEach(func(oidB, _ []byte) error { + ord, err := decodeOrderBucket(oidB, bkt.Bucket(oidB)) + if err != nil { + return fmt.Errorf("error decoding order %x: %v", oidB, err) + } + if stamp := ord.Order.Prefix().ClientTime.Unix(); stamp < int64(archiveSizeLimit) { + return fmt.Errorf("order stamp %d should have been pruned", stamp) + } + return nil + }) + }); err != nil { + t.Fatal(err) + } + + // Add an order with an early stamp and an active match + oid := addOrder(1) + m := &db.MetaMatch{ + MetaData: &db.MatchMetaData{ + DEX: host, + Base: 1, + }, + UserMatch: ordertest.RandomUserMatch(), + } + m.OrderID = oid + m.Status = order.NewlyMatched + if err := boltdb.UpdateMatch(m); err != nil { + t.Fatal(err) + } + + if err := boltdb.pruneArchivedOrders(); err != nil { + t.Fatalf("Error pruning orders when one has an active match: %v", err) + } + + if n := archivedOrdersN(); n != archiveSizeLimit { + t.Fatalf("Expected %d archived orders after pruning with active match order in place, saw %d", archiveSizeLimit, n) + } + + // Our active match order should still be available + if _, err := boltdb.Order(oid); err != nil { + t.Fatalf("Error retrieving unpruned active match order: %v", err) + } + + // Retire the active match order + m.Status = order.MatchComplete + if err := boltdb.UpdateMatch(m); err != nil { + t.Fatal(err) + } + // Add an order to push the now retirable older order out + addOrder(0) + if err := boltdb.pruneArchivedOrders(); err != nil { + t.Fatalf("Error pruning orders after retiring match: %v", err) + } + if n := archivedOrdersN(); n != archiveSizeLimit { + t.Fatalf("Expected %d archived orders after pruning with retired match, saw %d", archiveSizeLimit, n) + } + // Match should not be archived any longer. + metaID := m.MatchOrderUniqueID() + if err := boltdb.matchesView(func(mb, archivedMB *bbolt.Bucket) error { + if mb.Bucket(metaID) != nil || archivedMB.Bucket(metaID) != nil { + return errors.New("still found bucket for retired match of pruned order") + } + return nil + }); err != nil { + t.Fatal(err) + } +} From 4bab10fbdd3db83a61505d5affcb887b1112bf4e Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Sun, 15 Sep 2024 19:23:10 -0500 Subject: [PATCH 2/2] use update time instead of decoding order --- client/db/bolt/db.go | 15 ++++++++++----- client/db/bolt/db_test.go | 23 +++++++++++------------ 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/client/db/bolt/db.go b/client/db/bolt/db.go index ea7a4d97b5..5087612bac 100644 --- a/client/db/bolt/db.go +++ b/client/db/bolt/db.go @@ -471,7 +471,7 @@ func (db *BoltDB) pruneArchivedOrders() error { type orderStamp struct { oid []byte - stamp int64 + stamp uint64 } deletes := make([]*orderStamp, 0, toClear) sortDeletes := func() { @@ -485,11 +485,16 @@ func (db *BoltDB) pruneArchivedOrders() error { if _, found := oidsWithActiveMatches[oid]; found { return nil } - ord, err := decodeOrderBucket(oidB, archivedOB.Bucket(oidB)) - if err != nil { - return fmt.Errorf("error decoding order %x: %v", oid, err) + oBkt := archivedOB.Bucket(oidB) + if oBkt == nil { + return fmt.Errorf("no order bucket iterated order %x", oidB) + } + stampB := oBkt.Get(updateTimeKey) + if stampB == nil { + // Highly improbable. + stampB = make([]byte, 8) } - stamp := ord.Order.Prefix().ClientTime.Unix() + stamp := intCoder.Uint64(stampB) if len(deletes) < toClear { deletes = append(deletes, &orderStamp{ stamp: stamp, diff --git a/client/db/bolt/db_test.go b/client/db/bolt/db_test.go index dc26b9a557..d17193f16f 100644 --- a/client/db/bolt/db_test.go +++ b/client/db/bolt/db_test.go @@ -1627,13 +1627,11 @@ func TestPruneArchivedOrders(t *testing.T) { return n } - var ordStampI int64 - addOrder := func(optStamp int64) order.OrderID { + var ordStampI uint64 + addOrder := func(stamp uint64) order.OrderID { ord, _ := ordertest.RandomLimitOrder() - if optStamp != 0 { - ord.P.ClientTime = time.Unix(optStamp, 0) - } else { - ord.P.ClientTime = time.Unix(ordStampI, 0) + if stamp == 0 { + stamp = ordStampI ordStampI++ } boltdb.UpdateOrder(&db.MetaOrder{ @@ -1644,7 +1642,12 @@ func TestPruneArchivedOrders(t *testing.T) { }, Order: ord, }) - return ord.ID() + oid := ord.ID() + boltdb.ordersUpdate(func(ob, archivedOB *bbolt.Bucket) error { + archivedOB.Bucket(oid[:]).Put(updateTimeKey, uint64Bytes(stamp)) + return nil + }) + return oid } for i := 0; i < archiveSizeLimit*2; i++ { addOrder(0) @@ -1666,11 +1669,7 @@ func TestPruneArchivedOrders(t *testing.T) { if err := boltdb.View(func(tx *bbolt.Tx) error { bkt := tx.Bucket(archivedOrdersBucket) return bkt.ForEach(func(oidB, _ []byte) error { - ord, err := decodeOrderBucket(oidB, bkt.Bucket(oidB)) - if err != nil { - return fmt.Errorf("error decoding order %x: %v", oidB, err) - } - if stamp := ord.Order.Prefix().ClientTime.Unix(); stamp < int64(archiveSizeLimit) { + if stamp := intCoder.Uint64(bkt.Bucket(oidB).Get(updateTimeKey)); stamp < archiveSizeLimit { return fmt.Errorf("order stamp %d should have been pruned", stamp) } return nil