From e84195a59d9be321bc4e24e2a7f43dd289d657ec Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 4 Aug 2023 02:21:36 +0200 Subject: [PATCH 1/2] diff --- kv/memdb/memory_mutation.go | 63 ++++++++++++++++++++++++++++++++ kv/memdb/memory_mutation_diff.go | 53 +++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 kv/memdb/memory_mutation_diff.go diff --git a/kv/memdb/memory_mutation.go b/kv/memdb/memory_mutation.go index b86ef637f..fddcab6d3 100644 --- a/kv/memdb/memory_mutation.go +++ b/kv/memdb/memory_mutation.go @@ -17,6 +17,7 @@ import ( "bytes" "context" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv/iter" "github.com/ledgerwatch/erigon-lib/kv/order" "github.com/ledgerwatch/log/v3" @@ -378,6 +379,68 @@ func (m *MemoryMutation) Flush(tx kv.RwTx) error { return nil } +func (m *MemoryMutation) Diff() (*MemoryDiff, error) { + memDiff := &MemoryDiff{ + diff: make(map[table]map[string][]byte), + deletedEntries: make(map[string][]string), + } + // Obtain buckets touched. + buckets, err := m.memTx.ListBuckets() + if err != nil { + return nil, err + } + // Obliterate buckets who are to be deleted + for bucket := range m.clearedTables { + memDiff.clearedTableNames = append(memDiff.clearedTableNames, bucket) + } + // Obliterate entries who are to be deleted + for bucket, keys := range m.deletedEntries { + for key := range keys { + memDiff.deletedEntries[bucket] = append(memDiff.deletedEntries[bucket], key) + } + } + // Iterate over each bucket and apply changes accordingly. + for _, bucket := range buckets { + if isTablePurelyDupsort(bucket) { + cbucket, err := m.memTx.CursorDupSort(bucket) + if err != nil { + return nil, err + } + defer cbucket.Close() + + t := table{ + name: bucket, + dupsort: true, + } + memDiff.diff[t] = make(map[string][]byte) + for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() { + if err != nil { + return nil, err + } + memDiff.diff[t][string(k)] = common.Copy(v) + } + } else { + cbucket, err := m.memTx.Cursor(bucket) + if err != nil { + return nil, err + } + defer cbucket.Close() + t := table{ + name: bucket, + dupsort: false, + } + memDiff.diff[t] = make(map[string][]byte) + for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() { + if err != nil { + return nil, err + } + memDiff.diff[t][string(k)] = common.Copy(v) + } + } + } + return memDiff, nil +} + // Check if a bucket is dupsorted and has dupsort conversion off func isTablePurelyDupsort(bucket string) bool { config, ok := kv.ChaindataTablesCfg[bucket] diff --git a/kv/memdb/memory_mutation_diff.go b/kv/memdb/memory_mutation_diff.go new file mode 100644 index 000000000..01ce80a88 --- /dev/null +++ b/kv/memdb/memory_mutation_diff.go @@ -0,0 +1,53 @@ +package memdb + +import "github.com/ledgerwatch/erigon-lib/kv" + +type MemoryDiff struct { + diff map[table]map[string][]byte // god. + deletedEntries map[string][]string + clearedTableNames []string +} + +type table struct { + name string + dupsort bool +} + +func (m *MemoryDiff) Flush(tx kv.RwTx) error { + // Obliterate buckets who are to be deleted + for _, bucket := range m.clearedTableNames { + if err := tx.ClearBucket(bucket); err != nil { + return err + } + } + // Obliterate entries who are to be deleted + for bucket, keys := range m.deletedEntries { + for _, key := range keys { + if err := tx.Delete(bucket, []byte(key)); err != nil { + return err + } + } + } + // Iterate over each bucket and apply changes accordingly. + for bucketInfo, bucketDiff := range m.diff { + if bucketInfo.dupsort { + dbCursor, err := tx.RwCursorDupSort(bucketInfo.name) + if err != nil { + return err + } + defer dbCursor.Close() + for k, v := range bucketDiff { + if err := dbCursor.Put([]byte(k), v); err != nil { + return err + } + } + } else { + for k, v := range bucketDiff { + if err := tx.Put(bucketInfo.name, []byte(k), v); err != nil { + return err + } + } + } + } + return nil +} From 93200251b54dee547f61a1342c434b2c20aedbdf Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 4 Aug 2023 03:11:54 +0200 Subject: [PATCH 2/2] kv --- kv/memdb/memory_mutation.go | 14 +++++++++----- kv/memdb/memory_mutation_diff.go | 15 ++++++++++----- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/kv/memdb/memory_mutation.go b/kv/memdb/memory_mutation.go index fddcab6d3..bca696ddc 100644 --- a/kv/memdb/memory_mutation.go +++ b/kv/memdb/memory_mutation.go @@ -381,7 +381,7 @@ func (m *MemoryMutation) Flush(tx kv.RwTx) error { func (m *MemoryMutation) Diff() (*MemoryDiff, error) { memDiff := &MemoryDiff{ - diff: make(map[table]map[string][]byte), + diff: make(map[table][]entry), deletedEntries: make(map[string][]string), } // Obtain buckets touched. @@ -412,12 +412,14 @@ func (m *MemoryMutation) Diff() (*MemoryDiff, error) { name: bucket, dupsort: true, } - memDiff.diff[t] = make(map[string][]byte) for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() { if err != nil { return nil, err } - memDiff.diff[t][string(k)] = common.Copy(v) + memDiff.diff[t] = append(memDiff.diff[t], entry{ + k: common.Copy(k), + v: common.Copy(v), + }) } } else { cbucket, err := m.memTx.Cursor(bucket) @@ -429,12 +431,14 @@ func (m *MemoryMutation) Diff() (*MemoryDiff, error) { name: bucket, dupsort: false, } - memDiff.diff[t] = make(map[string][]byte) for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() { if err != nil { return nil, err } - memDiff.diff[t][string(k)] = common.Copy(v) + memDiff.diff[t] = append(memDiff.diff[t], entry{ + k: common.Copy(k), + v: common.Copy(v), + }) } } } diff --git a/kv/memdb/memory_mutation_diff.go b/kv/memdb/memory_mutation_diff.go index 01ce80a88..7f58b8a1d 100644 --- a/kv/memdb/memory_mutation_diff.go +++ b/kv/memdb/memory_mutation_diff.go @@ -2,8 +2,13 @@ package memdb import "github.com/ledgerwatch/erigon-lib/kv" +type entry struct { + k []byte + v []byte +} + type MemoryDiff struct { - diff map[table]map[string][]byte // god. + diff map[table][]entry // god. deletedEntries map[string][]string clearedTableNames []string } @@ -36,14 +41,14 @@ func (m *MemoryDiff) Flush(tx kv.RwTx) error { return err } defer dbCursor.Close() - for k, v := range bucketDiff { - if err := dbCursor.Put([]byte(k), v); err != nil { + for _, entry := range bucketDiff { + if err := dbCursor.Put(entry.k, entry.v); err != nil { return err } } } else { - for k, v := range bucketDiff { - if err := tx.Put(bucketInfo.name, []byte(k), v); err != nil { + for _, entry := range bucketDiff { + if err := tx.Put(bucketInfo.name, entry.k, entry.v); err != nil { return err } }