-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
prunedfreezer.go
339 lines (296 loc) · 10.1 KB
/
prunedfreezer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
package rawdb
import (
"math"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/prometheus/tsdb/fileutil"
)
// prunedfreezer not contain ancient data, only record 'frozen' , the next recycle block number form kvstore.
type prunedfreezer struct {
db ethdb.KeyValueStore // Meta database
// WARNING: The `frozen` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // BlockNumber of next frozen block
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
instanceLock fileutil.Releaser // File-system lock to prevent double opens
quit chan struct{}
closeOnce sync.Once
}
// newNoDataFreezer creates a chain freezer that deletes data enough ‘old’.
func newPrunedFreezer(datadir string, db ethdb.KeyValueStore, offset uint64) (*prunedfreezer, error) {
if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
if info.Mode()&os.ModeSymlink != 0 {
log.Warn("Symbolic link ancient database is not supported", "path", datadir)
return nil, errSymlinkDatadir
}
}
lock, _, err := fileutil.Flock(filepath.Join(datadir, "../NODATA_ANCIENT_FLOCK"))
if err != nil {
return nil, err
}
freezer := &prunedfreezer{
db: db,
frozen: offset,
threshold: params.FullImmutabilityThreshold,
instanceLock: lock,
quit: make(chan struct{}),
}
if err := freezer.repair(datadir); err != nil {
return nil, err
}
// delete ancient dir
if err := os.RemoveAll(datadir); err != nil && !os.IsNotExist(err) {
log.Warn("Failed to remove the ancient dir", "path", datadir, "error", err)
return nil, err
}
log.Info("Opened ancientdb with nodata mode", "database", datadir, "frozen", freezer.frozen)
return freezer, nil
}
// repair init frozen , compatible disk-ancientdb and pruner-block-tool.
func (f *prunedfreezer) repair(datadir string) error {
offset := atomic.LoadUint64(&f.frozen)
// compatible freezer
min := uint64(math.MaxUint64)
for name, disableSnappy := range chainFreezerNoSnappy {
table, err := newFreezerTable(datadir, name, disableSnappy, false)
if err != nil {
return err
}
items := table.items.Load()
if min > items {
min = items
}
table.Close()
}
log.Info("Read ancientdb item counts", "items", min)
offset += min
if frozen := ReadFrozenOfAncientFreezer(f.db); frozen > offset {
offset = frozen
}
atomic.StoreUint64(&f.frozen, offset)
if err := f.Sync(); err != nil {
return nil
}
return nil
}
// Close terminates the chain prunedfreezer.
func (f *prunedfreezer) Close() error {
var err error
f.closeOnce.Do(func() {
close(f.quit)
f.Sync()
err = f.instanceLock.Release()
})
return err
}
// HasAncient returns an indicator whether the specified ancient data exists, return nil.
func (f *prunedfreezer) HasAncient(kind string, number uint64) (bool, error) {
return false, nil
}
// Ancient retrieves an ancient binary blob from prunedfreezer, return nil.
func (f *prunedfreezer) Ancient(kind string, number uint64) ([]byte, error) {
if _, ok := chainFreezerNoSnappy[kind]; ok {
if number >= atomic.LoadUint64(&f.frozen) {
return nil, errOutOfBounds
}
return nil, nil
}
return nil, errUnknownTable
}
// Ancients returns the last of the frozen items.
func (f *prunedfreezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
}
// ItemAmountInAncient returns the actual length of current ancientDB, return 0.
func (f *prunedfreezer) ItemAmountInAncient() (uint64, error) {
return 0, nil
}
// AncientOffSet returns the offset of current ancientDB, offset == frozen.
func (f *prunedfreezer) AncientOffSet() uint64 {
return atomic.LoadUint64(&f.frozen)
}
// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (db *prunedfreezer) MigrateTable(kind string, convert convertLegacyFn) error {
return errNotSupported
}
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (db *prunedfreezer) AncientDatadir() (string, error) {
return "", errNotSupported
}
// Tail returns the number of first stored item in the freezer.
func (f *prunedfreezer) Tail() (uint64, error) {
return 0, errNotSupported
}
// AncientSize returns the ancient size of the specified category, return 0.
func (f *prunedfreezer) AncientSize(kind string) (uint64, error) {
if _, ok := chainFreezerNoSnappy[kind]; ok {
return 0, nil
}
return 0, errUnknownTable
}
// AppendAncient update frozen.
//
// Notably, this function is lock free but kind of thread-safe. All out-of-order
// injection will be rejected. But if two injections with same number happen at
// the same time, we can get into the trouble.
func (f *prunedfreezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) {
if atomic.LoadUint64(&f.frozen) != number {
return errOutOrderInsertion
}
atomic.AddUint64(&f.frozen, 1)
return nil
}
// TruncateAncients discards any recent data above the provided threshold number, always success.
func (f *prunedfreezer) TruncateHead(items uint64) (uint64, error) {
preHead := atomic.LoadUint64(&f.frozen)
if preHead > items {
atomic.StoreUint64(&f.frozen, items)
WriteFrozenOfAncientFreezer(f.db, atomic.LoadUint64(&f.frozen))
}
return preHead, nil
}
// TruncateTail discards any recent data below the provided threshold number.
func (f *prunedfreezer) TruncateTail(tail uint64) (uint64, error) {
return 0, errNotSupported
}
// Sync flushes meta data tables to disk.
func (f *prunedfreezer) Sync() error {
WriteFrozenOfAncientFreezer(f.db, atomic.LoadUint64(&f.frozen))
// compatible offline prune blocks tool
WriteOffSetOfCurrentAncientFreezer(f.db, atomic.LoadUint64(&f.frozen))
return nil
}
// freeze is a background thread that periodically checks the blockchain for any
// import progress and moves ancient data from the fast database into the freezer.
//
// This functionality is deliberately broken off from block importing to avoid
// incurring additional data shuffling delays on block propagation.
func (f *prunedfreezer) freeze() {
nfdb := &nofreezedb{KeyValueStore: f.db}
var backoff bool
for {
select {
case <-f.quit:
log.Info("Freezer shutting down")
return
default:
}
if backoff {
select {
case <-time.NewTimer(freezerRecheckInterval).C:
case <-f.quit:
return
}
}
// Retrieve the freezing threshold.
hash := ReadHeadBlockHash(nfdb)
if hash == (common.Hash{}) {
log.Debug("Current full block hash unavailable") // new chain, empty database
backoff = true
continue
}
number := ReadHeaderNumber(nfdb, hash)
threshold := atomic.LoadUint64(&f.threshold)
switch {
case number == nil:
log.Error("Current full block number unavailable", "hash", hash)
backoff = true
continue
case *number < threshold:
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold)
backoff = true
continue
case *number-threshold <= f.frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
backoff = true
continue
}
head := ReadHeader(nfdb, hash, *number)
if head == nil {
log.Error("Stable state block unavailable", "number", *number, "hash", hash)
backoff = true
continue
}
stableStabeNumber := ReadSafePointBlockNumber(nfdb)
switch {
case stableStabeNumber < params.StableStateThreshold:
log.Debug("Stable state block not old enough", "number", stableStabeNumber)
backoff = true
continue
case stableStabeNumber > *number:
log.Warn("Stable state block biger current full block", "number", stableStabeNumber, "number", *number)
backoff = true
continue
}
stableStabeNumber -= params.StableStateThreshold
// Seems we have data ready to be frozen, process in usable batches
limit := *number - threshold
if limit > stableStabeNumber {
limit = stableStabeNumber
}
if limit < f.frozen {
log.Debug("Stable state block has prune", "limit", limit, "frozen", f.frozen)
backoff = true
continue
}
if limit-f.frozen > freezerBatchLimit {
limit = f.frozen + freezerBatchLimit
}
var (
start = time.Now()
first = f.frozen
ancients = make([]common.Hash, 0, limit-f.frozen)
)
for f.frozen <= limit {
// Retrieves all the components of the canonical block
hash := ReadCanonicalHash(nfdb, f.frozen)
if hash == (common.Hash{}) {
log.Error("Canonical hash missing, can't freeze", "number", f.frozen)
}
log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash)
// Inject all the components into the relevant data tables
if err := f.AppendAncient(f.frozen, nil, nil, nil, nil, nil); err != nil {
log.Error("Append ancient err", "number", f.frozen, "hash", hash, "err", err)
break
}
if hash != (common.Hash{}) {
ancients = append(ancients, hash)
}
}
// Batch of blocks have been frozen, flush them before wiping from leveldb
if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
backoff = f.frozen-first >= freezerBatchLimit
gcKvStore(f.db, ancients, first, f.frozen, start)
}
}
func (f *prunedfreezer) SetupFreezerEnv(env *ethdb.FreezerEnv) error {
return nil
}
func (f *prunedfreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
return fn(f)
}
func (f *prunedfreezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return nil, errNotSupported
}
func (f *prunedfreezer) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, errNotSupported
}
// TruncateTableTail will truncate certain table to new tail
func (f *prunedfreezer) TruncateTableTail(kind string, tail uint64) (uint64, error) {
return 0, errNotSupported
}
// ResetTable will reset certain table with new start point
func (f *prunedfreezer) ResetTable(kind string, startAt uint64, onlyEmpty bool) error {
return errNotSupported
}