Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
if err != nil {
return fileHeader, err
}
if header.Name == "content.msgpack" {
if header.Name == ledger.CatchpointContentFileName {
// we already know it's valid, since we validated that above.
protocol.Decode(balancesBlockBytes, &fileHeader)
}
Expand Down
13 changes: 0 additions & 13 deletions data/accountManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,6 @@ func MakeAccountManager(log logging.Logger, registry account.ParticipationRegist
return manager
}

// DeleteStateProofKeysForExpiredAccounts removes ephemeral keys for every expired account
func (manager *AccountManager) DeleteStateProofKeysForExpiredAccounts(currentRound basics.Round) {
for _, part := range manager.registry.GetAll() {
if currentRound <= part.LastValid {
continue
}
// since DeleteStateProofKeys doesn't remove the last round, we add one to make sure all secrets are being removed
if err := manager.DeleteStateProofKey(part.ParticipationID, part.LastValid+1); err != nil {
manager.log.Warnf("error while removing state proof keys for participant %v on round %v: %v", part.ParticipationID, part.LastValid+1, err)
}
}
}

// Keys returns a list of Participation accounts, and their keys/secrets for requested round.
func (manager *AccountManager) Keys(rnd basics.Round) (out []account.ParticipationRecordForRound) {
for _, part := range manager.registry.GetAll() {
Expand Down
4 changes: 2 additions & 2 deletions data/accountManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,15 @@ func TestAccountManagerRemoveStateProofKeysForExpiredAccounts(t *testing.T) {
a.Equal(1, len(res))
}

acctManager.DeleteStateProofKeysForExpiredAccounts(part1.LastValid + 1)
b := bookkeeping.BlockHeader{Round: part1.LastValid + 1}
acctManager.DeleteOldKeys(b, config.Consensus[protocol.ConsensusCurrentVersion])
err = acctManager.registry.Flush(10 * time.Second)
a.NoError(err)

for i := 1; i <= 2; i++ {
res := acctManager.StateProofKeys(basics.Round(i * merklesignature.KeyLifetimeDefault))
a.Equal(0, len(res))
}

}

func TestGetStateProofKeysDontLogErrorOnNilStateProof(t *testing.T) {
Expand Down
10 changes: 3 additions & 7 deletions ledger/apply/stateproof.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ func gatherVerificationContextUsingBlockHeaders(sp StateProofsApplier, lastRound
return nil, err
}

verificationContext := ledgercore.StateProofVerificationContext{
LastAttestedRound: lastRoundInInterval,
VotersCommitment: votersHdr.StateProofTracking[protocol.StateProofBasic].StateProofVotersCommitment,
OnlineTotalWeight: votersHdr.StateProofTracking[protocol.StateProofBasic].StateProofOnlineTotalWeight,
Version: votersHdr.CurrentProtocol,
}
return &verificationContext, nil
verificationContext := ledgercore.MakeStateProofVerificationContext(&votersHdr, lastRoundInInterval)

return verificationContext, nil
}
2 changes: 1 addition & 1 deletion ledger/apply/stateproof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,6 @@ func TestApplyStateProof(t *testing.T) {
// transaction should be applied without stateproof validation (no context, blockheader or valid stateproof needed as it represents a node catching up)
err = StateProof(stateProofTx, atRound, applier, false)
a.NoError(err)
// make sure that the StateProofNext was updated correctly after applying
// make sure that the ModStateProofNextRound was updated correctly after applying
a.Equal(basics.Round(512+config.Consensus[protocol.ConsensusFuture].StateProofInterval), applier.GetStateProofNextRound())
}
20 changes: 13 additions & 7 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ const (
// CatchpointFileVersionV7 is the catchpoint file version that is matching database schema V10.
// This version introduced state proof verification data and versioning for CatchpointLabel.
CatchpointFileVersionV7 = uint64(0202)

CatchpointContentFileName = "content.msgpack"
CatchpointSPVerificationFileName = "stateProofVerificationContext.msgpack"
CatchpointBalancesFileNameTemplate = "balances.%d.msgpack"
CatchpointBalancesFileNamePrefix = "balances."
CatchpointBalancesFileNameSuffix = ".msgpack"
)

func catchpointStage1Encoder(w io.Writer) (io.WriteCloser, error) {
Expand Down Expand Up @@ -199,7 +205,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
var totalAccounts uint64
var totalChunks uint64
var biggestChunkLen uint64
var stateProofVerificationHash crypto.Digest
var spVerificationHash crypto.Digest
var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails

if ct.enableGeneratingCatchpointFiles {
Expand All @@ -210,7 +216,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
var err error

catchpointGenerationStats.BalancesWriteTime = uint64(updatingBalancesDuration.Nanoseconds())
totalKVs, totalAccounts, totalChunks, biggestChunkLen, stateProofVerificationHash, err = ct.generateCatchpointData(
totalKVs, totalAccounts, totalChunks, biggestChunkLen, spVerificationHash, err = ct.generateCatchpointData(
ctx, dbRound, &catchpointGenerationStats)
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
if err != nil {
Expand All @@ -224,7 +230,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
return err
}

err = ct.recordFirstStageInfo(ctx, tx, &catchpointGenerationStats, dbRound, totalKVs, totalAccounts, totalChunks, biggestChunkLen, stateProofVerificationHash)
err = ct.recordFirstStageInfo(ctx, tx, &catchpointGenerationStats, dbRound, totalKVs, totalAccounts, totalChunks, biggestChunkLen, spVerificationHash)
if err != nil {
return err
}
Expand Down Expand Up @@ -595,7 +601,7 @@ func doRepackCatchpoint(ctx context.Context, header CatchpointFileHeader, bigges
bytes := protocol.Encode(&header)

err := out.WriteHeader(&tar.Header{
Name: "content.msgpack",
Name: CatchpointContentFileName,
Mode: 0600,
Size: int64(len(bytes)),
})
Expand Down Expand Up @@ -1090,7 +1096,7 @@ func (ct *catchpointTracker) IsWritingCatchpointDataFile() bool {
// - Balance and KV chunk (named balances.x.msgpack).
// ...
// - Balance and KV chunk (named balances.x.msgpack).
func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, accountsRound basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails) (totalKVs, totalAccounts, totalChunks, biggestChunkLen uint64, stateProofVerificationContextHash crypto.Digest, err error) {
func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, accountsRound basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails) (totalKVs, totalAccounts, totalChunks, biggestChunkLen uint64, spVerificationHash crypto.Digest, err error) {
ct.log.Debugf("catchpointTracker.generateCatchpointData() writing catchpoint accounts for round %d", accountsRound)

startTime := time.Now()
Expand Down Expand Up @@ -1120,7 +1126,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account
return
}

stateProofVerificationContextHash, err = catchpointWriter.WriteStateProofVerificationContext()
spVerificationHash, err = catchpointWriter.WriteStateProofVerificationContext()
if err != nil {
return
}
Expand Down Expand Up @@ -1184,7 +1190,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account
catchpointGenerationStats.KVsCount = catchpointWriter.totalKVs
catchpointGenerationStats.AccountsRound = uint64(accountsRound)

return catchpointWriter.totalKVs, catchpointWriter.totalAccounts, catchpointWriter.chunkNum, catchpointWriter.biggestChunkLen, stateProofVerificationContextHash, nil
return catchpointWriter.totalKVs, catchpointWriter.totalAccounts, catchpointWriter.chunkNum, catchpointWriter.biggestChunkLen, spVerificationHash, nil
}

func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx trackerdb.TransactionScope, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails, accountsRound basics.Round, totalKVs uint64, totalAccounts uint64, totalChunks uint64, biggestChunkLen uint64, stateProofVerificationHash crypto.Digest) error {
Expand Down
92 changes: 82 additions & 10 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,16 +312,7 @@ func TestRecordCatchpointFile(t *testing.T) {

for _, round := range []basics.Round{2000000, 3000010, 3000015, 3000020} {
accountsRound := round - 1

var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails
_, _, _, biggestChunkLen, stateProofVerificationHash, err := ct.generateCatchpointData(
context.Background(), accountsRound, &catchpointGenerationStats)
require.NoError(t, err)

require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash)

err = ct.createCatchpoint(context.Background(), accountsRound, round, trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, crypto.Digest{})
require.NoError(t, err)
createCatchpoint(t, ct, accountsRound, ml, round)
}

numberOfCatchpointFiles, err := getNumberOfCatchpointFilesInDir(temporaryDirectory)
Expand All @@ -335,6 +326,87 @@ func TestRecordCatchpointFile(t *testing.T) {
require.Equalf(t, onlyCatchpointDirEmpty, true, "Directories: %v", emptyDirs)
}

func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics.Round, ml *mockLedgerForTracker, round basics.Round) {
var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails
_, _, _, biggestChunkLen, stateProofVerificationHash, err := ct.generateCatchpointData(
context.Background(), accountsRound, &catchpointGenerationStats)
require.NoError(t, err)

require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash)

err = ct.createCatchpoint(context.Background(), accountsRound, round, trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, crypto.Digest{})
require.NoError(t, err)
}

// TestCatchpointFileWithLargeSpVerification makes sure that CatchpointFirstStageInfo.BiggestChunkLen is calculated based on state proof verification contexts
// as well as other chunks in the catchpoint files.
func TestCatchpointFileWithLargeSpVerification(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a brief comment for this test

partitiontest.PartitionTest(t)

temporaryDirectory := t.TempDir()

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()

ct := &catchpointTracker{}
conf := config.GetDefaultLocal()

conf.Archival = true
ct.initialize(conf, ".")
defer ct.close()
ct.dbDirectory = temporaryDirectory

_, err := trackerDBInitialize(ml, true, ct.dbDirectory)
require.NoError(t, err)

err = ct.loadFromDisk(ml, ml.Latest())
require.NoError(t, err)

// create catpoint with no sp verification data
round := basics.Round(2000000)
createCatchpoint(t, ct, round-1, ml, round)

numberOfCatchpointFiles, err := getNumberOfCatchpointFilesInDir(temporaryDirectory)
require.NoError(t, err)
require.Equal(t, 1, numberOfCatchpointFiles)
// create catpoint with 2 sp verification data
writeDummySpVerification(t, 0, 3, ml)

round = basics.Round(3000000)
createCatchpoint(t, ct, round-1, ml, round)

numberOfCatchpointFiles, err = getNumberOfCatchpointFilesInDir(temporaryDirectory)
require.NoError(t, err)
require.Equal(t, 2, numberOfCatchpointFiles)

// create catpoint with 500 sp verification data - the sp verification chunk should be the largest
writeDummySpVerification(t, 4, 500, ml)

round = basics.Round(4000000)
createCatchpoint(t, ct, round-1, ml, round)

numberOfCatchpointFiles, err = getNumberOfCatchpointFilesInDir(temporaryDirectory)
require.NoError(t, err)
require.Equal(t, 3, numberOfCatchpointFiles)
}

func writeDummySpVerification(t *testing.T, nextIndexForContext uint64, numberOfContexts uint64, ml *mockLedgerForTracker) {
err := ml.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) error {

contexts := make([]*ledgercore.StateProofVerificationContext, numberOfContexts)
for i := uint64(0); i < numberOfContexts; i++ {
e := ledgercore.StateProofVerificationContext{}
e.LastAttestedRound = basics.Round(nextIndexForContext + i)
contexts[i] = &e
}
writer := tx.MakeSpVerificationCtxReaderWriter()

return writer.StoreSPContexts(ctx, contexts[:])
})
require.NoError(t, err)
}

func BenchmarkLargeCatchpointDataWriting(b *testing.B) {
proto := config.Consensus[protocol.ConsensusCurrentVersion]

Expand Down
14 changes: 9 additions & 5 deletions ledger/catchpointwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ const (
// In reality most entries are asset holdings, and they are very small.
ResourcesPerCatchpointFileChunk = 100_000

// StateProofVerificationContextPerCatchpointFile defines the maximum number of state proof verification data stored
// SPContextPerCatchpointFile defines the maximum number of state proof verification data stored
// in the catchpoint file.
// (2 years * 31536000 seconds per year) / (256 rounds per state proof verification data * 3.6 seconds per round) ~= 70000
StateProofVerificationContextPerCatchpointFile = 70000
SPContextPerCatchpointFile = 70000
)

// catchpointWriter is the struct managing the persistence of accounts data into the catchpoint file.
Expand Down Expand Up @@ -100,7 +100,7 @@ func (chunk catchpointFileChunkV6) empty() bool {

type catchpointStateProofVerificationContext struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`
Data []ledgercore.StateProofVerificationContext `codec:"spd,allocbound=StateProofVerificationContextPerCatchpointFile"`
Data []ledgercore.StateProofVerificationContext `codec:"spd,allocbound=SPContextPerCatchpointFile"`
}

func (data catchpointStateProofVerificationContext) ToBeHashed() (protocol.HashID, []byte) {
Expand Down Expand Up @@ -170,7 +170,7 @@ func (cw *catchpointWriter) WriteStateProofVerificationContext() (crypto.Digest,
dataHash, encodedData := crypto.EncodeAndHash(wrappedData)

err = cw.tar.WriteHeader(&tar.Header{
Name: "stateProofVerificationContext.msgpack",
Name: CatchpointSPVerificationFileName,
Mode: 0600,
Size: int64(len(encodedData)),
})
Expand All @@ -184,6 +184,10 @@ func (cw *catchpointWriter) WriteStateProofVerificationContext() (crypto.Digest,
return crypto.Digest{}, err
}

if chunkLen := uint64(len(encodedData)); cw.biggestChunkLen < chunkLen {
cw.biggestChunkLen = chunkLen
}

return dataHash, nil
}

Expand Down Expand Up @@ -290,7 +294,7 @@ func (cw *catchpointWriter) asyncWriter(chunks chan catchpointFileChunkV6, respo
}
encodedChunk := protocol.Encode(&chk)
err := cw.tar.WriteHeader(&tar.Header{
Name: fmt.Sprintf("balances.%d.msgpack", chunkNum),
Name: fmt.Sprintf(CatchpointBalancesFileNameTemplate, chunkNum),
Mode: 0600,
Size: int64(len(encodedChunk)),
})
Expand Down
5 changes: 3 additions & 2 deletions ledger/catchpointwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func verifyStateProofVerificationContextWrite(t *testing.T, data []ledgercore.St
})

catchpointData := readCatchpointDataFile(t, fileName)
require.Equal(t, "stateProofVerificationContext.msgpack", catchpointData[0].headerName)
require.Equal(t, CatchpointSPVerificationFileName, catchpointData[0].headerName)
var wrappedData catchpointStateProofVerificationContext
err = protocol.Decode(catchpointData[0].data, &wrappedData)
require.NoError(t, err)
Expand Down Expand Up @@ -275,7 +275,8 @@ func TestBasicCatchpointWriter(t *testing.T) {
})

catchpointContent := readCatchpointDataFile(t, fileName)
require.Equal(t, "balances.1.msgpack", catchpointContent[1].headerName)
balanceFileName := fmt.Sprintf(CatchpointBalancesFileNameTemplate, 1)
require.Equal(t, balanceFileName, catchpointContent[1].headerName)

var chunk catchpointFileChunkV6
err = protocol.Decode(catchpointContent[1].data, &chunk)
Expand Down
10 changes: 5 additions & 5 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ type CatchpointCatchupAccessorProgress struct {
// ProcessStagingBalances deserialize the given bytes as a temporary staging balances
func (c *catchpointCatchupAccessorImpl) ProcessStagingBalances(ctx context.Context, sectionName string, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) {
// content.msgpack comes first, followed by stateProofVerificationContext.msgpack and then by balances.x.msgpack.
if sectionName == "content.msgpack" {
if sectionName == CatchpointContentFileName {
return c.processStagingContent(ctx, bytes, progress)
}
if sectionName == "stateProofVerificationContext.msgpack" {
if sectionName == CatchpointSPVerificationFileName {
return c.processStagingStateProofVerificationContext(bytes)
}
if strings.HasPrefix(sectionName, "balances.") && strings.HasSuffix(sectionName, ".msgpack") {
if strings.HasPrefix(sectionName, CatchpointBalancesFileNamePrefix) && strings.HasSuffix(sectionName, CatchpointBalancesFileNameSuffix) {
return c.processStagingBalances(ctx, bytes, progress)
}
// we want to allow undefined sections to support backward compatibility.
Expand Down Expand Up @@ -972,14 +972,14 @@ func (c *catchpointCatchupAccessorImpl) VerifyCatchpoint(ctx context.Context, bl
}

wrappedContext := catchpointStateProofVerificationContext{Data: rawStateProofVerificationContext}
stateProofVerificationContextHash := crypto.HashObj(wrappedContext)
spVerificationHash := crypto.HashObj(wrappedContext)

var catchpointLabelMaker ledgercore.CatchpointLabelMaker
blockDigest := blk.Digest()
if version <= CatchpointFileVersionV6 {
catchpointLabelMaker = ledgercore.MakeCatchpointLabelMakerV6(blockRound, &blockDigest, &balancesHash, totals)
} else {
catchpointLabelMaker = ledgercore.MakeCatchpointLabelMakerCurrent(blockRound, &blockDigest, &balancesHash, totals, &stateProofVerificationContextHash)
catchpointLabelMaker = ledgercore.MakeCatchpointLabelMakerCurrent(blockRound, &blockDigest, &balancesHash, totals, &spVerificationHash)
}
generatedLabel := ledgercore.MakeLabel(catchpointLabelMaker)

Expand Down
Loading