Skip to content

Commit

Permalink
indexers: Provide interface for index removal.
Browse files Browse the repository at this point in the history
Indexes may now optionally provide their own implementation for
dropping the index, with a fallback to simply removing the index
bucket and metadata if not implemented.

Using an interface and dynamically dispatching to the correct drop
implementation also allowed removing a special case for deletion of
the transaction index from the common drop code.
  • Loading branch information
jrick committed Mar 27, 2018
1 parent 0fc5525 commit b092705
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 49 deletions.
7 changes: 6 additions & 1 deletion blockchain/indexers/addrindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,5 +1091,10 @@ func NewAddrIndex(db database.DB, chainParams *chaincfg.Params) *AddrIndex {
// DropAddrIndex drops the address index from the provided database if it
// exists.
func DropAddrIndex(db database.DB, interrupt <-chan struct{}) error {
return dropIndex(db, addrIndexKey, addrIndexName, interrupt)
return dropFlatIndex(db, addrIndexKey, addrIndexName, interrupt)
}

// DropIndex drops the address index from the provided database if it exists.
func (*AddrIndex) DropIndex(db database.DB, interrupt <-chan struct{}) error {
return DropAddrIndex(db, interrupt)
}
7 changes: 7 additions & 0 deletions blockchain/indexers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ type Indexer interface {
DisconnectBlock(dbTx database.Tx, block, parent *dcrutil.Block, view *blockchain.UtxoViewpoint) error
}

// IndexDropper provides a method to remove an index from the database. Indexers
// may implement this for a more efficient way of deleting themselves from the
// database rather than simply dropping a bucket.
type IndexDropper interface {
DropIndex(db database.DB, interrupt <-chan struct{}) error
}

// AssertError identifies an error that indicates an internal code consistency
// issue and should be treated as a critical and unrecoverable error.
type AssertError string
Expand Down
8 changes: 7 additions & 1 deletion blockchain/indexers/existsaddrindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ func (idx *ExistsAddrIndex) AddUnconfirmedTx(tx *wire.MsgTx) {
// DropExistsAddrIndex drops the exists address index from the provided
// database if it exists.
func DropExistsAddrIndex(db database.DB, interrupt <-chan struct{}) error {
return dropIndex(db, existsAddrIndexKey, existsAddressIndexName,
return dropFlatIndex(db, existsAddrIndexKey, existsAddressIndexName,
interrupt)
}

// DropIndex drops the exists address index from the provided database if it
// exists.
func (*ExistsAddrIndex) DropIndex(db database.DB, interrupt <-chan struct{}) error {
return DropExistsAddrIndex(db, interrupt)
}
161 changes: 116 additions & 45 deletions blockchain/indexers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,18 @@ func (m *Manager) maybeFinishDrops(interrupt <-chan struct{}) error {
}

log.Infof("Resuming %s drop", indexer.Name())
err := dropIndex(m.db, indexer.Key(), indexer.Name(), interrupt)
if err != nil {
return err

switch d := indexer.(type) {
case IndexDropper:
err := d.DropIndex(m.db, interrupt)
if err != nil {
return err
}
default:
err := dropIndex(m.db, indexer.Key(), indexer.Name())
if err != nil {
return err
}
}
}

Expand Down Expand Up @@ -653,47 +662,32 @@ func NewManager(db database.DB, enabledIndexes []Indexer, params *chaincfg.Param
}
}

// dropIndex drops the passed index from the database. Since indexes can be
// massive, it deletes the index in multiple database transactions in order to
// keep memory usage to reasonable levels. It also marks the drop in progress
// so the drop can be resumed if it is stopped before it is done before the
// index can be used again.
func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
// Nothing to do if the index doesn't already exist.
var needsDelete bool
// existsIndex returns whether the index keyed by idxKey exists in the database.
func existsIndex(db database.DB, idxKey []byte, idxName string) (bool, error) {
var exists bool
err := db.View(func(dbTx database.Tx) error {
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
if indexesBucket != nil && indexesBucket.Get(idxKey) != nil {
needsDelete = true
exists = true
}
return nil
})
if err != nil {
return err
}
if !needsDelete {
log.Infof("Not dropping %s because it does not exist", idxName)
return nil
}
return exists, err
}

// Mark that the index is in the process of being dropped so that it
// can be resumed on the next start if interrupted before the process is
// complete.
log.Infof("Dropping all %s entries. This might take a while...",
idxName)
err = db.Update(func(dbTx database.Tx) error {
// markIndexDeletion marks the index identified by idxKey for deletion. Marking
// an index for deletion allows deletion to resume next startup if an
// incremental deletion was interrupted.
func markIndexDeletion(db database.DB, idxKey []byte) error {
return db.Update(func(dbTx database.Tx) error {
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
return indexesBucket.Put(indexDropKey(idxKey), idxKey)
})
if err != nil {
return err
}
}

// Since the indexes can be so large, attempting to simply delete
// the bucket in a single database transaction would result in massive
// memory usage and likely crash many systems due to ulimits. In order
// to avoid this, use a cursor to delete a maximum number of entries out
// of the bucket at a time.
// incrementalFlatDrop uses multiple database updates to remove key/value pairs
// saved to a flat index.
func incrementalFlatDrop(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
const maxDeletions = 2000000
var totalDeleted uint64
for numDeleted := maxDeletions; numDeleted == maxDeletions; {
Expand Down Expand Up @@ -725,29 +719,106 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s
return errInterruptRequested
}
}
return nil
}

// Call extra index specific deinitialization for the transaction index.
if idxName == txIndexName {
if err := dropBlockIDIndex(db); err != nil {
return err
}
}

// Remove the index tip, index bucket, and in-progress drop flag now
// that all index entries have been removed.
err = db.Update(func(dbTx database.Tx) error {
// dropIndexMetadata drops the passed index from the database by removing the
// top level bucket for the index, the index tip, and any in-progress drop flag.
func dropIndexMetadata(db database.DB, idxKey []byte, idxName string) error {
return db.Update(func(dbTx database.Tx) error {
meta := dbTx.Metadata()
indexesBucket := meta.Bucket(indexTipsBucketName)
if err := indexesBucket.Delete(idxKey); err != nil {
err := indexesBucket.Delete(idxKey)
if err != nil {
return err
}

if err := meta.DeleteBucket(idxKey); err != nil {
err = meta.DeleteBucket(idxKey)
if err != nil && !database.IsError(err, database.ErrBucketNotFound) {
return err
}

return indexesBucket.Delete(indexDropKey(idxKey))
})
}

// dropFlatIndex incrementally drops the passed index from the database. Since
// indexes can be massive, it deletes the index in multiple database
// transactions in order to keep memory usage to reasonable levels. For this
// algorithm to work, the index must be "flat" (have no nested buckets). It
// also marks the drop in progress so the drop can be resumed if it is stopped
// before it is done before the index can be used again.
func dropFlatIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
// Nothing to do if the index doesn't already exist.
exists, err := existsIndex(db, idxKey, idxName)
if err != nil {
return err
}
if !exists {
log.Infof("Not dropping %s because it does not exist", idxName)
return nil
}

log.Infof("Dropping all %s entries. This might take a while...",
idxName)

// Mark that the index is in the process of being dropped so that it
// can be resumed on the next start if interrupted before the process is
// complete.
err = markIndexDeletion(db, idxKey)
if err != nil {
return err
}

// Since the indexes can be so large, attempting to simply delete
// the bucket in a single database transaction would result in massive
// memory usage and likely crash many systems due to ulimits. In order
// to avoid this, use a cursor to delete a maximum number of entries out
// of the bucket at a time.
err = incrementalFlatDrop(db, idxKey, idxName, interrupt)
if err != nil {
return err
}

// Remove the index tip, index bucket, and in-progress drop flag now
// that all index entries have been removed.
err = dropIndexMetadata(db, idxKey, idxName)
if err != nil {
return err
}

log.Infof("Dropped %s", idxName)
return nil
}

// dropIndex drops the passed index from the database without using incremental
// deletion. This should be used to drop indexes containing nested buckets,
// which can not be deleted with dropFlatIndex.
func dropIndex(db database.DB, idxKey []byte, idxName string) error {
// Nothing to do if the index doesn't already exist.
exists, err := existsIndex(db, idxKey, idxName)
if err != nil {
return err
}
if !exists {
log.Infof("Not dropping %s because it does not exist", idxName)
return nil
}

log.Infof("Dropping all %s entries. This might take a while...",
idxName)

// Mark that the index is in the process of being dropped so that it
// can be resumed on the next start if interrupted before the process is
// complete.
err = markIndexDeletion(db, idxKey)
if err != nil {
return err
}

// Remove the index tip, index bucket, and in-progress drop flag. Removing
// the index bucket also recursively removes all values saved to the index.
err = dropIndexMetadata(db, idxKey, idxName)
if err != nil {
return err
}
Expand Down
58 changes: 56 additions & 2 deletions blockchain/indexers/txindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,64 @@ func dropBlockIDIndex(db database.DB) error {
// exists. Since the address index relies on it, the address index will also be
// dropped when it exists.
func DropTxIndex(db database.DB, interrupt <-chan struct{}) error {
err := dropIndex(db, addrIndexKey, addrIndexName, interrupt)
// Nothing to do if the index doesn't already exist.
exists, err := existsIndex(db, txIndexKey, txIndexName)
if err != nil {
return err
}
if !exists {
log.Infof("Not dropping %s because it does not exist", txIndexName)
return nil
}

// Mark that the index is in the process of being dropped so that it
// can be resumed on the next start if interrupted before the process is
// complete.
err = markIndexDeletion(db, txIndexKey)
if err != nil {
return err
}

// Drop the address index if it exists, as it depends on the transaction
// index.
err = DropAddrIndex(db, interrupt)
if err != nil {
return err
}

log.Infof("Dropping all %s entries. This might take a while...",
txIndexName)

// Since the indexes can be so large, attempting to simply delete
// the bucket in a single database transaction would result in massive
// memory usage and likely crash many systems due to ulimits. In order
// to avoid this, use a cursor to delete a maximum number of entries out
// of the bucket at a time.
err = incrementalFlatDrop(db, txIndexKey, txIndexName, interrupt)
if err != nil {
return err
}

// Call extra index specific deinitialization for the transaction index.
err = dropBlockIDIndex(db)
if err != nil {
return err
}

// Remove the index tip, index bucket, and in-progress drop flag now
// that all index entries have been removed.
err = dropIndexMetadata(db, txIndexKey, txIndexName)
if err != nil {
return err
}

return dropIndex(db, txIndexKey, txIndexName, interrupt)
log.Infof("Dropped %s", txIndexName)
return nil
}

// DropIndex drops the transaction index from the provided database if it
// exists. Since the address index relies on it, the address index will also be
// dropped when it exists.
func (*TxIndex) DropIndex(db database.DB, interrupt <-chan struct{}) error {
return DropTxIndex(db, interrupt)
}
6 changes: 6 additions & 0 deletions database/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,9 @@ func (e Error) Error() string {
func makeError(c ErrorCode, desc string, err error) Error {
return Error{ErrorCode: c, Description: desc, Err: err}
}

// IsError returns whether err is an Error with a matching error code.
func IsError(err error, code ErrorCode) bool {
e, ok := err.(Error)
return ok && e.ErrorCode == code
}

0 comments on commit b092705

Please sign in to comment.