-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Deneb DB methods #12379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deneb DB methods #12379
Changes from all commits
9f8a2c2
b45190d
fab4597
d1e6412
ab62427
b36491f
1ece755
66d81cb
7fa1277
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,252 @@ | ||
| package kv | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "fmt" | ||
| "sort" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "github.com/prysmaticlabs/prysm/v4/config/params" | ||
| types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" | ||
| "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" | ||
| ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" | ||
| bolt "go.etcd.io/bbolt" | ||
| "go.opencensus.io/trace" | ||
| ) | ||
|
|
||
| // SaveBlobSidecar saves the blobs for a given epoch in the sidecar bucket. When we receive a blob: | ||
| // | ||
| // 1. Convert slot using a modulo operator to [0, maxSlots] where maxSlots = MAX_BLOB_EPOCHS*SLOTS_PER_EPOCH | ||
| // | ||
| // 2. Compute key for blob as bytes(slot_to_rotating_buffer(blob.slot)) ++ bytes(blob.slot) ++ blob.block_root | ||
| // | ||
| // 3. Begin the save algorithm: If the incoming blob has a slot bigger than the saved slot at the spot | ||
| // in the rotating keys buffer, we overwrite all elements for that slot. | ||
| func (s *Store) SaveBlobSidecar(ctx context.Context, scs []*ethpb.BlobSidecar) error { | ||
| ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlobSidecar") | ||
| defer span.End() | ||
|
|
||
| sortSideCars(scs) | ||
| if err := s.verifySideCars(scs); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| slot := scs[0].Slot | ||
| return s.db.Update(func(tx *bolt.Tx) error { | ||
| encodedBlobSidecar, err := encode(ctx, ðpb.BlobSidecars{Sidecars: scs}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| bkt := tx.Bucket(blobsBucket) | ||
| c := bkt.Cursor() | ||
| newKey := blobSidecarKey(scs[0]) | ||
| rotatingBufferPrefix := newKey[0:8] | ||
| var replacingKey []byte | ||
| for k, _ := c.Seek(rotatingBufferPrefix); bytes.HasPrefix(k, rotatingBufferPrefix); k, _ = c.Next() { | ||
| if len(k) != 0 { | ||
| replacingKey = k | ||
| oldSlotBytes := replacingKey[8:16] | ||
| oldSlot := bytesutil.BytesToSlotBigEndian(oldSlotBytes) | ||
| if oldSlot >= slot { | ||
| return fmt.Errorf("attempted to save blob with slot %d but already have older blob with slot %d", slot, oldSlot) | ||
| } | ||
| break | ||
| } | ||
| } | ||
| // If there is no element stored at blob.slot % MAX_SLOTS_TO_PERSIST_BLOBS, then we simply | ||
| // store the blob by key and exit early. | ||
| if len(replacingKey) != 0 { | ||
| if err := bkt.Delete(replacingKey); err != nil { | ||
| log.WithError(err).Warnf("Could not delete blob with key %#x", replacingKey) | ||
| } | ||
| } | ||
| return bkt.Put(newKey, encodedBlobSidecar) | ||
| }) | ||
| } | ||
|
|
||
| // verifySideCars ensures that all sidecars have the same slot, parent root, block root, and proposer index. | ||
| // It also ensures that indices are sequential and start at 0 and no more than MAX_BLOB_EPOCHS. | ||
| func (s *Store) verifySideCars(scs []*ethpb.BlobSidecar) error { | ||
| if len(scs) == 0 { | ||
| return errors.New("nil or empty blob sidecars") | ||
| } | ||
| if uint64(len(scs)) > params.BeaconConfig().MaxBlobsPerBlock { | ||
| return fmt.Errorf("too many sidecars: %d > %d", len(scs), params.BeaconConfig().MaxBlobsPerBlock) | ||
| } | ||
|
|
||
| sl := scs[0].Slot | ||
| pr := scs[0].BlockParentRoot | ||
| r := scs[0].BlockRoot | ||
| p := scs[0].ProposerIndex | ||
|
|
||
| for i, sc := range scs { | ||
| if sc.Slot != sl { | ||
| return fmt.Errorf("sidecar slot mismatch: %d != %d", sc.Slot, sl) | ||
| } | ||
| if !bytes.Equal(sc.BlockParentRoot, pr) { | ||
| return fmt.Errorf("sidecar parent root mismatch: %x != %x", sc.BlockParentRoot, pr) | ||
| } | ||
| if !bytes.Equal(sc.BlockRoot, r) { | ||
| return fmt.Errorf("sidecar root mismatch: %x != %x", sc.BlockRoot, r) | ||
| } | ||
| if sc.ProposerIndex != p { | ||
| return fmt.Errorf("sidecar proposer index mismatch: %d != %d", sc.ProposerIndex, p) | ||
| } | ||
| if sc.Index != uint64(i) { | ||
| return fmt.Errorf("sidecar index mismatch: %d != %d", sc.Index, i) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // sortSideCars sorts the sidecars by their index. | ||
| func sortSideCars(scs []*ethpb.BlobSidecar) { | ||
| sort.Slice(scs, func(i, j int) bool { | ||
| return scs[i].Index < scs[j].Index | ||
| }) | ||
| } | ||
|
|
||
| // BlobSidecarsByRoot retrieves the blobs for the given beacon block root. | ||
| // If the `indices` argument is omitted, all blobs for the root will be returned. | ||
| // Otherwise, the result will be filtered to only include the specified indices. | ||
| // An error will result if an invalid index is specified. | ||
| // The bucket size is bounded by 131072 entries. That's the most blobs a node will keep before rotating it out. | ||
| func (s *Store) BlobSidecarsByRoot(ctx context.Context, root [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) { | ||
| ctx, span := trace.StartSpan(ctx, "BeaconDB.BlobSidecarsByRoot") | ||
| defer span.End() | ||
|
|
||
| var enc []byte | ||
| if err := s.db.View(func(tx *bolt.Tx) error { | ||
| c := tx.Bucket(blobsBucket).Cursor() | ||
| // Bucket size is bounded and bolt cursors are fast. Moreover, a thin caching layer can be added. | ||
| for k, v := c.First(); k != nil; k, v = c.Next() { | ||
| if bytes.HasSuffix(k, root[:]) { | ||
| enc = v | ||
| break | ||
| } | ||
| } | ||
|
Comment on lines
+123
to
+128
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to be an O(n) lookup because you have to search every value in the bucket. Consider indexing the block roots in another bucket or putting the block root at the start of the string. Searching by prefix ought to be O(logn) if bbolt uses binary search. I'm not totally sure about the implementation yet. Also consider simply acknowledging this in the godoc since the bucket is bounded to a max size, it may never be much of an issue.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered multiple options and decided to remain the same to keep the implementation simple. I added a benchmark 9bcd36a to prove in the absolute worst case, every slot has the blob over the last 18 days (highly unlikely), to traverse through the list to get the last blob is no more than 2.3ms. This is entirely acceptable given this is not used in the hot path:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could be much faster but I'm OK with accepting it like this. Thanks for writing the benchmark and demonstrating the worst case. We can monitor these spans as well to see how much of an impact it is. |
||
| return nil | ||
| }); err != nil { | ||
| return nil, err | ||
| } | ||
| if enc == nil { | ||
| return nil, ErrNotFound | ||
| } | ||
| sc := ðpb.BlobSidecars{} | ||
| if err := decode(ctx, enc, sc); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return filterForIndices(sc, indices...) | ||
| } | ||
|
|
||
| func filterForIndices(sc *ethpb.BlobSidecars, indices ...uint64) ([]*ethpb.BlobSidecar, error) { | ||
| if len(indices) == 0 { | ||
| return sc.Sidecars, nil | ||
| } | ||
| // This loop assumes that the BlobSidecars value stores the complete set of blobs for a block | ||
| // in ascending order from eg 0..3, without gaps. This allows us to assume the indices argument | ||
| // maps 1:1 with indices in the BlobSidecars storage object. | ||
| maxIdx := uint64(len(sc.Sidecars)) - 1 | ||
| sidecars := make([]*ethpb.BlobSidecar, len(indices)) | ||
| for i, idx := range indices { | ||
| if idx > maxIdx { | ||
| return nil, errors.Wrapf(ErrNotFound, "BlobSidecars missing index: index %d", idx) | ||
| } | ||
| sidecars[i] = sc.Sidecars[idx] | ||
| } | ||
| return sidecars, nil | ||
| } | ||
|
|
||
| // BlobSidecarsBySlot retrieves BlobSidecars for the given slot. | ||
| // If the `indices` argument is omitted, all blobs for the root will be returned. | ||
| // Otherwise, the result will be filtered to only include the specified indices. | ||
| // An error will result if an invalid index is specified. | ||
| // The bucket size is bounded by 131072 entries. That's the most blobs a node will keep before rotating it out. | ||
| func (s *Store) BlobSidecarsBySlot(ctx context.Context, slot types.Slot, indices ...uint64) ([]*ethpb.BlobSidecar, error) { | ||
| ctx, span := trace.StartSpan(ctx, "BeaconDB.BlobSidecarsBySlot") | ||
| defer span.End() | ||
|
|
||
| var enc []byte | ||
| sk := slotKey(slot) | ||
| if err := s.db.View(func(tx *bolt.Tx) error { | ||
| c := tx.Bucket(blobsBucket).Cursor() | ||
| // Bucket size is bounded and bolt cursors are fast. Moreover, a thin caching layer can be added. | ||
| for k, v := c.Seek(sk); bytes.HasPrefix(k, sk); k, _ = c.Next() { | ||
| slotInKey := bytesutil.BytesToSlotBigEndian(k[8:16]) | ||
| if slotInKey == slot { | ||
| enc = v | ||
| break | ||
| } | ||
| } | ||
| return nil | ||
| }); err != nil { | ||
| return nil, err | ||
| } | ||
| if enc == nil { | ||
| return nil, ErrNotFound | ||
| } | ||
| sc := ðpb.BlobSidecars{} | ||
| if err := decode(ctx, enc, sc); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return filterForIndices(sc, indices...) | ||
| } | ||
|
|
||
| // DeleteBlobSidecar returns true if the blobs are in the db. | ||
| func (s *Store) DeleteBlobSidecar(ctx context.Context, beaconBlockRoot [32]byte) error { | ||
| _, span := trace.StartSpan(ctx, "BeaconDB.DeleteBlobSidecar") | ||
| defer span.End() | ||
| return s.db.Update(func(tx *bolt.Tx) error { | ||
| bkt := tx.Bucket(blobsBucket) | ||
| c := bkt.Cursor() | ||
| for k, _ := c.First(); k != nil; k, _ = c.Next() { | ||
| if bytes.HasSuffix(k, beaconBlockRoot[:]) { | ||
| if err := bkt.Delete(k); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| // We define a blob sidecar key as: bytes(slot_to_rotating_buffer(blob.slot)) ++ bytes(blob.slot) ++ blob.block_root | ||
| // where slot_to_rotating_buffer(slot) = slot % MAX_SLOTS_TO_PERSIST_BLOBS. | ||
| func blobSidecarKey(blob *ethpb.BlobSidecar) []byte { | ||
| key := slotKey(blob.Slot) | ||
| key = append(key, bytesutil.SlotToBytesBigEndian(blob.Slot)...) | ||
| key = append(key, blob.BlockRoot...) | ||
| return key | ||
| } | ||
|
|
||
| func slotKey(slot types.Slot) []byte { | ||
| slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch | ||
| maxEpochsToPersistBlobs := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest | ||
| maxSlotsToPersistBlobs := types.Slot(maxEpochsToPersistBlobs.Mul(uint64(slotsPerEpoch))) | ||
| return bytesutil.SlotToBytesBigEndian(slot.ModSlot(maxSlotsToPersistBlobs)) | ||
| } | ||
|
|
||
| func checkEpochsForBlobSidecarsRequestBucket(db *bolt.DB) error { | ||
| if err := db.Update(func(tx *bolt.Tx) error { | ||
| b := tx.Bucket(epochsForBlobSidecarsRequestBucket) | ||
| k := []byte("epoch-key") | ||
| v := b.Get(k) | ||
| if v == nil { | ||
| if err := b.Put(k, bytesutil.Uint64ToBytesBigEndian(uint64(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest))); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
| e := bytesutil.BytesToUint64BigEndian(v) | ||
| if e != uint64(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest) { | ||
| return fmt.Errorf("epochs for blobs request value in DB %d does not match config value %d", e, params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest) | ||
| } | ||
| return nil | ||
| }); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.