Skip to content

Commit

Permalink
ingest/ledgerbackend: Refactor captive core process manager (#5360)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored Jun 28, 2024
1 parent 01dc176 commit b589529
Show file tree
Hide file tree
Showing 14 changed files with 738 additions and 649 deletions.
34 changes: 18 additions & 16 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,11 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
)
}

c.stellarCoreRunner = c.stellarCoreRunnerFactory()
err = c.stellarCoreRunner.catchup(from, to)
if err != nil {
stellarCoreRunner := c.stellarCoreRunnerFactory()
if err = stellarCoreRunner.catchup(from, to); err != nil {
return errors.Wrap(err, "error running stellar-core")
}
c.stellarCoreRunner = stellarCoreRunner

// The next ledger should be the first ledger of the checkpoint containing
// the requested ledger
Expand All @@ -375,11 +375,11 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
return errors.Wrap(err, "error calculating ledger and hash for stellar-core run")
}

c.stellarCoreRunner = c.stellarCoreRunnerFactory()
err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash)
if err != nil {
stellarCoreRunner := c.stellarCoreRunnerFactory()
if err = stellarCoreRunner.runFrom(runFrom, ledgerHash); err != nil {
return errors.Wrap(err, "error running stellar-core")
}
c.stellarCoreRunner = stellarCoreRunner

// In the online mode we update nextLedger after streaming the first ledger.
// This is to support versions before and after/including v17.1.0 that
Expand Down Expand Up @@ -556,7 +556,7 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited {
if _, exited := c.stellarCoreRunner.getProcessExitError(); exited {
return false
}

Expand Down Expand Up @@ -627,9 +627,6 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
if c.stellarCoreRunner == nil {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core cannot be nil, call PrepareRange first")
}
if c.closed {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core has an error, call PrepareRange first")
}

if sequence < c.nextExpectedSequence() {
return xdr.LedgerCloseMeta{}, errors.Errorf(
Expand All @@ -647,12 +644,17 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
)
}

ch, ok := c.stellarCoreRunner.getMetaPipe()
if !ok {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core is not running, call PrepareRange first")
}

// Now loop along the range until we find the ledger we want.
for {
select {
case <-ctx.Done():
return xdr.LedgerCloseMeta{}, ctx.Err()
case result, ok := <-c.stellarCoreRunner.getMetaPipe():
case result, ok := <-ch:
found, ledger, err := c.handleMetaPipeResult(sequence, result, ok)
if found || err != nil {
return ledger, err
Expand Down Expand Up @@ -732,7 +734,7 @@ func (c *CaptiveStellarCore) checkMetaPipeResult(result metaResult, ok bool) err
return err
}
if !ok || result.err != nil {
exited, err := c.stellarCoreRunner.getProcessExitError()
err, exited := c.stellarCoreRunner.getProcessExitError()
if exited && err != nil {
// Case 2 - The stellar core process exited unexpectedly with an error message
return errors.Wrap(err, "stellar core exited unexpectedly")
Expand Down Expand Up @@ -775,12 +777,12 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
if c.stellarCoreRunner == nil {
return 0, errors.New("stellar-core cannot be nil, call PrepareRange first")
}
if c.closed {
return 0, errors.New("stellar-core is closed, call PrepareRange first")

ch, ok := c.stellarCoreRunner.getMetaPipe()
if !ok {
return 0, errors.New("stellar-core is not running, call PrepareRange first")
}
if c.lastLedger == nil {
return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil
return c.nextExpectedSequence() - 1 + uint32(len(ch)), nil
}
return *c.lastLedger, nil
}
Expand Down
64 changes: 31 additions & 33 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string) error {
return a.Error(0)
}

func (m *stellarCoreRunnerMock) getMetaPipe() <-chan metaResult {
func (m *stellarCoreRunnerMock) getMetaPipe() (<-chan metaResult, bool) {
a := m.Called()
return a.Get(0).(<-chan metaResult)
return a.Get(0).(<-chan metaResult), a.Bool(1)
}

func (m *stellarCoreRunnerMock) getProcessExitError() (bool, error) {
func (m *stellarCoreRunnerMock) getProcessExitError() (error, bool) {
a := m.Called()
return a.Bool(0), a.Error(1)
return a.Error(0), a.Bool(1)
}

func (m *stellarCoreRunnerMock) close() error {
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestCaptivePrepareRange(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -251,8 +251,8 @@ func TestCaptivePrepareRangeCrash(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("getProcessExitError").Return(true, errors.New("exit code -1"))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getProcessExitError").Return(errors.New("exit code -1"), true)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("close").Return(nil).Once()
mockRunner.On("context").Return(ctx)

Expand Down Expand Up @@ -292,7 +292,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Twice()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)

Expand Down Expand Up @@ -364,7 +364,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(fmt.Errorf("transient error"))
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)
mockRunner.On("context").Return(ctx)

captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) {
}

mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100)))
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestCaptivePrepareRangeWithDB_FromIsAheadOfRootHAS(t *testing.T) {
LedgerCloseMeta: &meta,
}
mockRunner.On("runFrom", uint32(99), "").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100)))
Expand Down Expand Up @@ -517,7 +517,6 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) {
func TestCaptivePrepareRange_ErrCatchup(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(192)).Return(errors.New("transient error")).Once()
mockRunner.On("close").Return(nil).Once()

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -552,7 +551,6 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) {
func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once()
mockRunner.On("close").Return(nil).Once()

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -604,9 +602,9 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -653,7 +651,7 @@ func TestGetLatestLedgerSequence(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -699,7 +697,7 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil)

Expand Down Expand Up @@ -766,9 +764,9 @@ func TestCaptiveGetLedger(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -857,7 +855,7 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) {
defer cancel()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(65), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -919,7 +917,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T)
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)

Expand Down Expand Up @@ -965,7 +963,7 @@ func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *t
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(64), mock.Anything).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)

Expand Down Expand Up @@ -1067,13 +1065,13 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
ctx, cancel := context.WithCancel(ctx)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil).Run(func(args mock.Arguments) {
cancel()
}).Once()
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

// even if the request to fetch the latest checkpoint succeeds, we should fail at creating the subprocess
mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -1125,7 +1123,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(fmt.Errorf("transient error")).Once()

Expand Down Expand Up @@ -1167,7 +1165,7 @@ func TestCaptiveAfterClose(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil).Once()

Expand Down Expand Up @@ -1222,7 +1220,7 @@ func TestGetLedgerBoundsCheck(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(128), uint32(130)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -1346,9 +1344,9 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) {
ctx := testCase.ctx
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(64), uint32(100)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(testCase.processExited, testCase.processExitedError)
mockRunner.On("getProcessExitError").Return(testCase.processExitedError, testCase.processExited)
mockRunner.On("close").Return(nil).Once()

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -1514,7 +1512,7 @@ func TestCaptiveRunFromParams(t *testing.T) {
func TestCaptiveIsPrepared(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("context").Return(context.Background()).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

// c.prepared == nil
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1578,7 +1576,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("context").Return(ctx).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

rang := UnboundedRange(100)
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1630,7 +1628,7 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil).Once()

Expand Down
Loading

0 comments on commit b589529

Please sign in to comment.