Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Reconciler Active/Inactive Concurrency Configuration #29

Merged
merged 2 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions reconciler/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ import (
// falls back to the default value.
type Option func(r *Reconciler)

// WithReconcilerConcurrency overrides the default reconciler
// WithInactiveConcurrency overrides the default inactive
// concurrency.
func WithReconcilerConcurrency(concurrency int) Option {
func WithInactiveConcurrency(concurrency int) Option {
return func(r *Reconciler) {
r.reconcilerConcurrency = concurrency
r.inactiveConcurrency = concurrency
}
}

// WithActiveConcurrency overrides the default active
// concurrency.
func WithActiveConcurrency(concurrency int) Option {
return func(r *Reconciler) {
r.activeConcurrency = concurrency
}
}

Expand Down
72 changes: 37 additions & 35 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,6 @@ type Handler interface {
balance string,
block *types.BlockIdentifier,
) error

NewAccountSeen(
ctx context.Context,
account *types.AccountIdentifier,
currency *types.Currency,
) error
}

// InactiveEntry is used to track the last
Expand All @@ -174,21 +168,36 @@ type AccountCurrency struct {
// types.AccountIdentifiers returned in types.Operations
// by a Rosetta Server.
type Reconciler struct {
network *types.NetworkIdentifier
helper Helper
handler Handler
fetcher *fetcher.Fetcher
reconcilerConcurrency int
lookupBalanceByBlock bool
interestingAccounts []*AccountCurrency
changeQueue chan *parser.BalanceChange
network *types.NetworkIdentifier
helper Helper
handler Handler
fetcher *fetcher.Fetcher
lookupBalanceByBlock bool
interestingAccounts []*AccountCurrency
changeQueue chan *parser.BalanceChange

// Reconciler concurrency is separated between
// active and inactive concurrency to allow for
// fine-grained tuning of reconciler behavior.
// When there are many transactions in a block
// on a resource-constrained machine (laptop),
// it is useful to allocate more resources to
// active reconciliation as it is synchronous
// (when lookupBalanceByBlock is enabled).
activeConcurrency int
inactiveConcurrency int

// highWaterMark is used to skip requests when
// we are very far behind the live head.
highWaterMark int64

// seenAccounts are stored for inactive account
// reconciliation.
// reconciliation. seenAccounts must be stored
// separately from inactiveQueue to prevent duplicate
// accounts from being added to the inactive reconciliation
// queue. If this is not done, it is possible a goroutine
// could be processing an account (not in the queue) when
// we do a lookup to determine if we should add to the queue.
seenAccounts []*AccountCurrency
inactiveQueue []*InactiveEntry

Expand All @@ -206,14 +215,15 @@ func New(
options ...Option,
) *Reconciler {
r := &Reconciler{
network: network,
helper: helper,
handler: handler,
fetcher: fetcher,
reconcilerConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: []*AccountCurrency{},
inactiveQueue: []*InactiveEntry{},
network: network,
helper: helper,
handler: handler,
fetcher: fetcher,
activeConcurrency: defaultReconcilerConcurrency,
inactiveConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: []*AccountCurrency{},
inactiveQueue: []*InactiveEntry{},

// When lookupBalanceByBlock is enabled, we check
// balance changes synchronously.
Expand Down Expand Up @@ -479,7 +489,7 @@ func (r *Reconciler) accountReconciliation(
return nil
}

err = r.inactiveAccountQueue(ctx, inactive, accountCurrency, liveBlock)
err = r.inactiveAccountQueue(inactive, accountCurrency, liveBlock)
if err != nil {
return err
}
Expand All @@ -498,7 +508,6 @@ func (r *Reconciler) accountReconciliation(
}

func (r *Reconciler) inactiveAccountQueue(
ctx context.Context,
inactive bool,
accountCurrency *AccountCurrency,
liveBlock *types.BlockIdentifier,
Expand All @@ -508,15 +517,6 @@ func (r *Reconciler) inactiveAccountQueue(
if !inactive && !ContainsAccountCurrency(r.seenAccounts, accountCurrency) {
r.seenAccounts = append(r.seenAccounts, accountCurrency)
shouldEnqueueInactive = true

err := r.handler.NewAccountSeen(
ctx,
accountCurrency.Account,
accountCurrency.Currency,
)
if err != nil {
return err
}
}

if inactive || shouldEnqueueInactive {
Expand Down Expand Up @@ -632,11 +632,13 @@ func (r *Reconciler) reconcileInactiveAccounts(
// If any goroutine errors, the function will return an error.
func (r *Reconciler) Reconcile(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
for j := 0; j < r.reconcilerConcurrency/2; j++ {
for j := 0; j < r.activeConcurrency; j++ {
g.Go(func() error {
return r.reconcileActiveAccounts(ctx)
})
}

for j := 0; j < r.inactiveConcurrency; j++ {
g.Go(func() error {
return r.reconcileInactiveAccounts(ctx)
})
Expand Down
37 changes: 7 additions & 30 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func TestNewReconciler(t *testing.T) {
},
"with reconciler concurrency": {
options: []Option{
WithReconcilerConcurrency(100),
WithInactiveConcurrency(100),
WithActiveConcurrency(200),
},
expected: func() *Reconciler {
r := templateReconciler()
r.reconcilerConcurrency = 100
r.inactiveConcurrency = 100
r.activeConcurrency = 200

return r
}(),
Expand Down Expand Up @@ -113,7 +115,8 @@ func TestNewReconciler(t *testing.T) {
assert.ElementsMatch(t, test.expected.inactiveQueue, result.inactiveQueue)
assert.ElementsMatch(t, test.expected.seenAccounts, result.seenAccounts)
assert.ElementsMatch(t, test.expected.interestingAccounts, result.interestingAccounts)
assert.Equal(t, test.expected.reconcilerConcurrency, result.reconcilerConcurrency)
assert.Equal(t, test.expected.inactiveConcurrency, result.inactiveConcurrency)
assert.Equal(t, test.expected.activeConcurrency, result.activeConcurrency)
assert.Equal(t, test.expected.lookupBalanceByBlock, result.lookupBalanceByBlock)
assert.Equal(t, cap(test.expected.changeQueue), cap(result.changeQueue))
})
Expand Down Expand Up @@ -520,13 +523,11 @@ func TestInactiveAccountQueue(t *testing.T) {

t.Run("new account in active reconciliation", func(t *testing.T) {
err := r.inactiveAccountQueue(
context.Background(),
false,
accountCurrency,
block,
)
assert.Nil(t, err)
assert.Equal(t, handler.LastAccountCurrency, accountCurrency)
assert.ElementsMatch(t, r.seenAccounts, []*AccountCurrency{accountCurrency})
assert.ElementsMatch(t, r.inactiveQueue, []*InactiveEntry{
{
Expand All @@ -538,13 +539,11 @@ func TestInactiveAccountQueue(t *testing.T) {

t.Run("another new account in active reconciliation", func(t *testing.T) {
err := r.inactiveAccountQueue(
context.Background(),
false,
accountCurrency2,
block2,
)
assert.Nil(t, err)
assert.Equal(t, handler.LastAccountCurrency, accountCurrency2)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -564,16 +563,13 @@ func TestInactiveAccountQueue(t *testing.T) {

t.Run("previous account in active reconciliation", func(t *testing.T) {
r.inactiveQueue = []*InactiveEntry{}
handler.LastAccountCurrency = nil

err := r.inactiveAccountQueue(
context.Background(),
false,
accountCurrency,
block,
)
assert.Nil(t, err)
assert.Nil(t, handler.LastAccountCurrency)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -584,13 +580,11 @@ func TestInactiveAccountQueue(t *testing.T) {

t.Run("previous account in inactive reconciliation", func(t *testing.T) {
err := r.inactiveAccountQueue(
context.Background(),
true,
accountCurrency,
block,
)
assert.Nil(t, err)
assert.Nil(t, handler.LastAccountCurrency)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -606,13 +600,11 @@ func TestInactiveAccountQueue(t *testing.T) {

t.Run("another previous account in inactive reconciliation", func(t *testing.T) {
err := r.inactiveAccountQueue(
context.Background(),
true,
accountCurrency2,
block2,
)
assert.Nil(t, err)
assert.Nil(t, handler.LastAccountCurrency)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -635,9 +627,7 @@ func templateReconciler() *Reconciler {
return New(nil, nil, nil, nil)
}

type MockReconcilerHandler struct {
LastAccountCurrency *AccountCurrency
}
type MockReconcilerHandler struct{}

func (h *MockReconcilerHandler) ReconciliationFailed(
ctx context.Context,
Expand All @@ -662,19 +652,6 @@ func (h *MockReconcilerHandler) ReconciliationSucceeded(
return nil
}

func (h *MockReconcilerHandler) NewAccountSeen(
ctx context.Context,
account *types.AccountIdentifier,
currency *types.Currency,
) error {
h.LastAccountCurrency = &AccountCurrency{
Account: account,
Currency: currency,
}

return nil
}

type MockReconcilerHelper struct {
HeadBlock *types.BlockIdentifier
StoredBlocks map[string]*types.Block
Expand Down