Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove init-genesis-state command #5504

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions ingest/genesis.go

This file was deleted.

19 changes: 0 additions & 19 deletions ingest/genesis_test.go

This file was deleted.

53 changes: 1 addition & 52 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,56 +259,6 @@ var ingestTriggerStateRebuildCmd = &cobra.Command{
},
}

var ingestInitGenesisStateCmd = &cobra.Command{
Use: "init-genesis-state",
Short: "ingests genesis state (ledger 1)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil {
return err
}

horizonSession, err := db.Open("postgres", globalConfig.DatabaseURL)
if err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{SessionInterface: horizonSession}

lastIngestedLedger, err := historyQ.GetLastLedgerIngestNonBlocking(ctx)
if err != nil {
return fmt.Errorf("cannot get last ledger value: %v", err)
}

if lastIngestedLedger != 0 {
return fmt.Errorf("cannot run on non-empty DB")
}

ingestConfig := ingest.Config{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
CheckpointFrequency: globalConfig.CheckpointFrequency,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
}

system, err := ingest.NewSystem(ingestConfig)
if err != nil {
return err
}

err = system.BuildGenesisState()
if err != nil {
return err
}

log.Info("Genesis ledger stat successfully ingested!")
return nil
},
}

var ingestBuildStateCmd = &cobra.Command{
Use: "build-state",
Short: "builds state at a given checkpoint. warning! requires clean DB.",
karthikiyer56 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -342,7 +292,7 @@ var ingestBuildStateCmd = &cobra.Command{
}

mngr := historyarchive.NewCheckpointManager(globalConfig.CheckpointFrequency)
if !mngr.IsCheckpoint(ingestBuildStateSequence) && ingestBuildStateSequence != 1 {
if !mngr.IsCheckpoint(ingestBuildStateSequence) {
return fmt.Errorf("`--sequence` must be a checkpoint ledger")
}

Expand Down Expand Up @@ -406,7 +356,6 @@ func init() {
ingestVerifyRangeCmd,
ingestStressTestCmd,
ingestTriggerStateRebuildCmd,
ingestInitGenesisStateCmd,
ingestBuildStateCmd,
)
}
2 changes: 0 additions & 2 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,12 @@ var (
DbFillGapsCmd = "fill-gaps"
DbReingestCmd = "reingest"
IngestTriggerStateRebuild = "trigger-state-rebuild"
IngestInitGenesisStateCmd = "init-genesis-state"
IngestBuildStateCmd = "build-state"
IngestStressTestCmd = "stress-test"
IngestVerifyRangeCmd = "verify-range"

ApiServerCommands = []string{HorizonCmd, ServeCmd}
IngestionCommands = append(ApiServerCommands,
IngestInitGenesisStateCmd,
IngestBuildStateCmd,
IngestStressTestCmd,
IngestVerifyRangeCmd,
Expand Down
21 changes: 0 additions & 21 deletions services/horizon/internal/ingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,27 +216,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() {
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionGenesisReturnsError() {
// Recreate mock in this single test to remove assertions.
*s.ledgerBackend = ledgerbackend.MockDatabaseBackend{}

s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once()
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(0)).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once()

s.runner.
On("RunGenesisStateIngestion").
Return(ingest.StatsChangeProcessorResults{}, errors.New("my error")).
Once()
next, err := buildState{checkpointLedger: 1}.run(s.system)

s.Assert().Error(err)
s.Assert().EqualError(err, "Error ingesting history archive: my error")
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestUpdateLastLedgerIngestAfterIngestReturnsError() {
s.mockCommonHistoryQ()
s.runner.
Expand Down
26 changes: 9 additions & 17 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,11 @@ func (b buildState) run(s *system) (transition, error) {
return nextFailState, errors.New("unexpected checkpointLedger value")
}

// We don't need to prepare range for genesis checkpoint because we don't
// perform protocol version and bucket list hash checks.
// In the long term we should probably create artificial xdr.LedgerCloseMeta
// for ledger #1 instead of using `ingest.GenesisChange` reader in
// ProcessorRunner.RunHistoryArchiveIngestion().
// We can also skip preparing range if `skipChecks` is `true` because we
// We can skip preparing range if `skipChecks` is `true` because we
// won't need bucket list hash and protocol version.
karthikiyer56 marked this conversation as resolved.
Show resolved Hide resolved
var protocolVersion uint32
var bucketListHash xdr.Hash
if b.checkpointLedger != 1 && !b.skipChecks {
if !b.skipChecks {
err := s.maybePrepareRange(s.ctx, b.checkpointLedger)
if err != nil {
return nextFailState, err
Expand Down Expand Up @@ -381,16 +376,13 @@ func (b buildState) run(s *system) (transition, error) {
startTime := time.Now()

var stats ingest.StatsChangeProcessorResults
if b.checkpointLedger == 1 {
stats, err = s.runner.RunGenesisStateIngestion()
} else {
stats, err = s.runner.RunHistoryArchiveIngestion(
b.checkpointLedger,
b.skipChecks,
protocolVersion,
bucketListHash,
)
}

stats, err = s.runner.RunHistoryArchiveIngestion(
b.checkpointLedger,
b.skipChecks,
protocolVersion,
bucketListHash,
)

if err != nil {
return nextFailState, errors.Wrap(err, "Error ingesting history archive")
Expand Down
10 changes: 0 additions & 10 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ type System interface {
VerifyRange(fromLedger, toLedger uint32, verifyState bool) error
BuildState(sequence uint32, skipChecks bool) error
ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error
BuildGenesisState() error
Shutdown()
GetCurrentState() State
RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error
Expand Down Expand Up @@ -675,15 +674,6 @@ func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) err
return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter)
}

// BuildGenesisState runs the ingestion pipeline on genesis ledger. Transitions
// to stopState when done.
func (s *system) BuildGenesisState() error {
return s.runStateMachine(buildState{
checkpointLedger: 1,
stop: true,
})
}

func (s *system) runStateMachine(cur stateMachineNode) error {
s.wg.Add(1)
defer func() {
Expand Down
10 changes: 0 additions & 10 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,11 +642,6 @@ func (m *mockProcessorsRunner) DisableMemoryStatsLogging() {
m.Called()
}

func (m *mockProcessorsRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) {
args := m.Called()
return args.Get(0).(ingest.StatsChangeProcessorResults), args.Error(1)
}

func (m *mockProcessorsRunner) RunHistoryArchiveIngestion(
checkpointLedger uint32,
skipChecks bool,
Expand Down Expand Up @@ -721,11 +716,6 @@ func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force boo
return args.Error(0)
}

func (m *mockSystem) BuildGenesisState() error {
args := m.Called()
return args.Error(0)
}

func (m *mockSystem) GetCurrentState() State {
args := m.Called()
return args.Get(0).(State)
Expand Down
59 changes: 24 additions & 35 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)
EnableMemoryStatsLogging()
DisableMemoryStatsLogging()
RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)
RunHistoryArchiveIngestion(
checkpointLedger uint32,
skipChecks bool,
Expand Down Expand Up @@ -192,10 +191,6 @@
return nil
}

func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) {
return s.RunHistoryArchiveIngestion(1, false, 0, xdr.Hash{})
}

func (s *ProcessorRunner) RunHistoryArchiveIngestion(
checkpointLedger uint32,
skipChecks bool,
Expand All @@ -218,43 +213,37 @@
return ingest.StatsChangeProcessorResults{}, err
}

if checkpointLedger == 1 {
if err := changeProcessor.ProcessChange(s.ctx, ingest.GenesisChange(s.config.NetworkPassphrase)); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error ingesting genesis ledger")
}
} else {
if !skipChecks {
if err := s.checkIfProtocolVersionSupported(ledgerProtocolVersion); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error while checking for supported protocol version")
}
if !skipChecks {
if err := s.checkIfProtocolVersionSupported(ledgerProtocolVersion); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error while checking for supported protocol version")
}
}

changeReader, err := s.historyAdapter.GetState(s.ctx, checkpointLedger)
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error creating HAS reader")
}
changeReader, err := s.historyAdapter.GetState(s.ctx, checkpointLedger)
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error creating HAS reader")
}

if !skipChecks {
if err = changeReader.VerifyBucketList(bucketListHash); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS")
}
if !skipChecks {
if err = changeReader.VerifyBucketList(bucketListHash); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS")
}
}

defer changeReader.Close()
defer changeReader.Close()

Check failure on line 233 in services/horizon/internal/ingest/processor_runner.go

View workflow job for this annotation

GitHub Actions / golangci

changeReader.Close undefined (type verifiableChangeReader has no field or method Close) (typecheck)

log.WithField("sequence", checkpointLedger).
Info("Processing entries from History Archive Snapshot")
log.WithField("sequence", checkpointLedger).
Info("Processing entries from History Archive Snapshot")

err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader(
changeReader,
"historyArchive",
checkpointLedger,
logFrequency,
s.logMemoryStats,
))
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error streaming changes from HAS")
}
err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader(
changeReader,
"historyArchive",
checkpointLedger,
logFrequency,
s.logMemoryStats,
))
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error streaming changes from HAS")
}

if err := changeProcessor.Commit(s.ctx); err != nil {
Expand Down
Loading
Loading