Skip to content

Commit

Permalink
indexers: fix indexer wait for sync.
Browse files Browse the repository at this point in the history
This fixes an issue with indexers wait for sync where the indexer gets
synced before a sync subcsriber is created. The sync subscriber is left
idling, waitingfor the next update which blocks the caller. The index
subscriber has been updated to periodically update sync subcribers
of all subscribed indexers to it as a result.
  • Loading branch information
dnldd committed Mar 28, 2022
1 parent 6ec3707 commit fcee27b
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 32 deletions.
12 changes: 11 additions & 1 deletion blockchain/indexers/addrindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func (idx *AddrIndex) Init(ctx context.Context, chainParams *chaincfg.Params) er
}

// Recover the address index and its dependents to the main chain if needed.
if err := recover(ctx, idx); err != nil {
if err := recoverIndex(ctx, idx); err != nil {
return err
}

Expand Down Expand Up @@ -702,12 +702,22 @@ func (idx *AddrIndex) IndexSubscription() *IndexSubscription {
// Subscribers returns all client channels waiting for the next index update.
//
// This is part of the Indexer interface.
// Deprecated: This will be removed in the next major version bump.
func (idx *AddrIndex) Subscribers() map[chan bool]struct{} {
idx.mtx.Lock()
defer idx.mtx.Unlock()
return idx.subscribers
}

// NotifySyncSubscribers signals subscribers of an index sync update.
//
// This is part of the Indexer interface.
func (idx *AddrIndex) NotifySyncSubscribers() {
idx.mtx.Lock()
notifySyncSubscribers(idx.subscribers)
idx.mtx.Unlock()
}

// WaitForSync subscribes clients for the next index sync update.
//
// This is part of the Indexer interface.
Expand Down
6 changes: 6 additions & 0 deletions blockchain/indexers/addrindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,16 @@ func TestAddrIndexAsync(t *testing.T) {
bk5a := addBlock(t, chain, &g, "bk5a")

// Resubscribe the indexes.
subber.mtx.Lock()
err = addrIdx.sub.stop()
subber.mtx.Unlock()
if err != nil {
t.Fatal(err)
}

subber.mtx.Lock()
err = txIdx.sub.stop()
subber.mtx.Unlock()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -619,7 +623,9 @@ func TestAddrIndexAsync(t *testing.T) {
}

// Drop the address index and resubscribe.
subber.mtx.Lock()
err = addrIdx.sub.stop()
subber.mtx.Unlock()
if err != nil {
t.Fatal(err)
}
Expand Down
30 changes: 17 additions & 13 deletions blockchain/indexers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ type Indexer interface {
WaitForSync() chan bool

// Subscribers returns all client channels waiting for the next index update.
// Deprecated: This will be removed in the next major version bump.
Subscribers() map[chan bool]struct{}

// NotifySyncSubscribers signals subscribers of an index sync update.
// This should only be called when an index is synced.
NotifySyncSubscribers()
}

// IndexDropper provides a method to remove an index from the database. Indexers
Expand Down Expand Up @@ -460,9 +465,18 @@ func tip(db database.DB, key []byte) (int64, *chainhash.Hash, error) {
return int64(height), hash, err
}

// recover reverts the index to a block on the main chain by repeatedly
// notifySyncSubscribers signals provided subscribers the index subcribed to
// is synced.
func notifySyncSubscribers(subscribers map[chan bool]struct{}) {
for sub := range subscribers {
close(sub)
delete(subscribers, sub)
}
}

// recoverIndex reverts the index to a block on the main chain by repeatedly
// disconnecting the index tip if it is not on the main chain.
func recover(ctx context.Context, idx Indexer) error {
func recoverIndex(ctx context.Context, idx Indexer) error {
// Fetch the current tip for the index.
height, hash, err := idx.Tip()
if err != nil {
Expand Down Expand Up @@ -670,13 +684,6 @@ func upgradeIndex(ctx context.Context, indexer Indexer, genesisHash *chainhash.H
// maybeNotifySubscribers updates subscribers the index is synced when
// the tip is identical to the chain tip.
func maybeNotifySubscribers(ctx context.Context, indexer Indexer) error {
subs := indexer.Subscribers()

// Exit immediately if the index has no subscribers.
if len(subs) == 0 {
return nil
}

if interruptRequested(ctx) {
return indexerError(ErrInterruptRequested, interruptMsg)
}
Expand All @@ -689,10 +696,7 @@ func maybeNotifySubscribers(ctx context.Context, indexer Indexer) error {
}

if tipHeight == bestHeight && *bestHash == *tipHash {
for sub := range subs {
close(sub)
delete(subs, sub)
}
indexer.NotifySyncSubscribers()
}

return nil
Expand Down
12 changes: 11 additions & 1 deletion blockchain/indexers/existsaddrindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (idx *ExistsAddrIndex) Init(ctx context.Context, chainParams *chaincfg.Para

// Recover the exists address index and its dependents to the main
// chain if needed.
if err := recover(ctx, idx); err != nil {
if err := recoverIndex(ctx, idx); err != nil {
return err
}

Expand Down Expand Up @@ -197,12 +197,22 @@ func (idx *ExistsAddrIndex) IndexSubscription() *IndexSubscription {
// Subscribers returns all client channels waiting for the next index update.
//
// This is part of the Indexer interface.
// Deprecated: This will be removed in the next major version bump.
func (idx *ExistsAddrIndex) Subscribers() map[chan bool]struct{} {
idx.mtx.Lock()
defer idx.mtx.Unlock()
return idx.subscribers
}

// NotifySyncSubscribers signals subscribers of an index sync update.
//
// This is part of the Indexer interface.
func (idx *ExistsAddrIndex) NotifySyncSubscribers() {
idx.mtx.Lock()
notifySyncSubscribers(idx.subscribers)
idx.mtx.Unlock()
}

// WaitForSync subscribes clients for the next index sync update.
//
// This is part of the Indexer interface.
Expand Down
4 changes: 4 additions & 0 deletions blockchain/indexers/existsaddrindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func TestExistsAddrIndexAsync(t *testing.T) {
bk5a := addBlock(t, chain, &g, "bk5a")

// Resubscribe the index.
subber.mtx.Lock()
err = idx.sub.stop()
subber.mtx.Unlock()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -223,7 +225,9 @@ func TestExistsAddrIndexAsync(t *testing.T) {
}

// Resubscribe the index.
subber.mtx.Lock()
err = idx.sub.stop()
subber.mtx.Unlock()
if err != nil {
t.Fatal(err)
}
Expand Down
78 changes: 63 additions & 15 deletions blockchain/indexers/indexsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/decred/dcrd/blockchain/v5/internal/progresslog"
"github.com/decred/dcrd/database/v3"
Expand All @@ -34,6 +35,10 @@ var (

// noPrereqs indicates no index prerequisites.
noPrereqs = "none"

// syncUpdateInterval is the time between periodically checking indexes
// and notifying their synchronization subscribers if synced.
syncUpdateInterval = time.Millisecond * 500
)

// IndexNtfn represents an index notification detailing a block connection
Expand Down Expand Up @@ -80,6 +85,8 @@ func newIndexSubscription(subber *IndexSubscriber, indexer Indexer, prereq strin

// stop prevents any future index updates from being delivered and
// unsubscribes the associated subscription.
//
// This must be called with the index subscriber mutex held for writes.
func (s *IndexSubscription) stop() error {

// If the subscription has a prerequisite, find it and remove the
Expand Down Expand Up @@ -109,9 +116,7 @@ func (s *IndexSubscription) stop() error {

// If the subscription is independent, remove it from the
// index subscriber's subscriptions.
s.mtx.Lock()
delete(s.subscriber.subscriptions, s.id)
s.mtx.Unlock()

return nil
}
Expand All @@ -125,6 +130,7 @@ type IndexSubscriber struct {
mtx sync.Mutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
quit chan struct{}
}

Expand Down Expand Up @@ -348,43 +354,85 @@ func (s *IndexSubscriber) CatchUp(ctx context.Context, db database.DB, queryer C
return nil
}

// Run relays index notifications to subscribed indexes.
// handleSyncSubscribers updates index sync subscribers when a subscribed
// indexer is fully synced.
//
// This should be run as a goroutine.
func (s *IndexSubscriber) Run(ctx context.Context) {
func (s *IndexSubscriber) handleSyncSubscribers(ctx context.Context) {
ticker := time.NewTicker(syncUpdateInterval)
defer ticker.Stop()

for {
select {
case ntfn := <-s.c:
// Relay the index update to subscribed indexes.
case <-ctx.Done():
s.wg.Done()
return

case <-ticker.C:
s.mtx.Lock()
for _, sub := range s.subscriptions {
err := updateIndex(ctx, sub.idx, &ntfn)
err := maybeNotifySubscribers(ctx, sub.idx)
if err != nil {
log.Error(err)
log.Errorf("unable to notify sync subscribers: %v", err)
s.cancel()
break
}
}
s.mtx.Unlock()
}
}
}

if ntfn.Done != nil {
close(ntfn.Done)
}

// handleIndexUpdates relays updates to subscribed indexers.
func (s *IndexSubscriber) handleIndexUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Infof("Index subscriber shutting down")

close(s.quit)

// Stop all updates to subscribed indexes and terminate their
// processes.
s.mtx.Lock()
for _, sub := range s.subscriptions {
err := sub.stop()
if err != nil {
log.Error("unable to stop index subscription: %v", err)
}
}
s.mtx.Unlock()

s.cancel()
s.wg.Done()

return

case ntfn := <-s.c:
// Relay the index update to subscribed indexes.
s.mtx.Lock()
for _, sub := range s.subscriptions {
err := updateIndex(ctx, sub.idx, &ntfn)
if err != nil {
log.Error(err)
s.cancel()
break
}
}
s.mtx.Unlock()

if ntfn.Done != nil {
close(ntfn.Done)
}
}
}
}

// Run relays index notifications to subscribed indexes.
//
// This should be run as a goroutine.
func (s *IndexSubscriber) Run(ctx context.Context) {
s.wg.Add(2)
go s.handleIndexUpdates(ctx)
go s.handleSyncSubscribers(ctx)
s.wg.Wait()

log.Infof("Index subscriber shutting down")
}
22 changes: 21 additions & 1 deletion blockchain/indexers/indexsubscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package indexers
import (
"context"
"testing"
"time"

"github.com/decred/dcrd/blockchain/v5/chaingen"
"github.com/decred/dcrd/chaincfg/v3"
Expand Down Expand Up @@ -37,7 +38,6 @@ func TestIndexSubscriberAsync(t *testing.T) {
defer pCancel()

subber := NewIndexSubscriber(ctx)
go subber.Run(ctx)

err = AddIndexSpendConsumers(db, chain)
if err != nil {
Expand All @@ -64,6 +64,8 @@ func TestIndexSubscriberAsync(t *testing.T) {
t.Fatal(err)
}

go subber.Run(ctx)

// Ensure all indexes through their prerequisite/dependency relationships
// are synced to the current chain tip (bk3).
addrIdxTipHeight, addrIdxTipHash, err := addrIdx.Tip()
Expand Down Expand Up @@ -110,6 +112,22 @@ func TestIndexSubscriberAsync(t *testing.T) {
existsAddrIdxTipHash)
}

// Create 3 sync subscribers for the tx index.
for idx := 0; idx < 3; idx++ {
txIdx.WaitForSync()
}

// Wait for the sync subscriber handler to run.
time.Sleep(time.Millisecond * 750)

// Ensure the tx index no longer has sync subscribers.
txIdx.mtx.Lock()
if len(txIdx.subscribers) > 0 {
txIdx.mtx.Unlock()
t.Fatalf("expected no sync subscribers for the tx index")
}
txIdx.mtx.Unlock()

// Ensure the address index remains in sync with the main chain when new
// blocks are connected.
bk4 := addBlock(t, chain, &g, "bk4")
Expand Down Expand Up @@ -168,7 +186,9 @@ func TestIndexSubscriberAsync(t *testing.T) {
}

// Ensure stopping a prequisite subscription stops its dependency as well.
subber.mtx.Lock()
err = txIdx.sub.stop()
subber.mtx.Unlock()
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit fcee27b

Please sign in to comment.