Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ledger: improving ledger export cmd functionality #175

Merged
merged 7 commits into from
Dec 3, 2020
Merged
19 changes: 13 additions & 6 deletions cmd/util/cmd/exec-data-json-export/block_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type blockSummary struct {
}

// ExportBlocks exports blocks (note this only export blocks of the main chain and doesn't export forks)
func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) error {
func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) (flow.StateCommitment, error) {

// traverse backward from the given block (parent block) and fetch by blockHash
db := common.InitStorage(dbPath)
Expand All @@ -44,13 +44,14 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err
seals := badger.NewSeals(cacheMetrics, db)
payloads := badger.NewPayloads(db, index, guarantees, seals)
blocks := badger.NewBlocks(db, headers, payloads)
commits := badger.NewCommits(&metrics.NoopCollector{}, db)

activeBlockID := blockID
outputFile := filepath.Join(outputPath, "blocks.jsonl")

fi, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("could not create block output file %w", err)
return nil, fmt.Errorf("could not create block output file %w", err)
}
defer fi.Close()

Expand All @@ -61,13 +62,13 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err
header, err := headers.ByBlockID(activeBlockID)
if err != nil {
// no more header is available
return nil
break
}

block, err := blocks.ByID(activeBlockID)
if err != nil {
// log.Fatal().Err(err).Msg("could not load block")
return nil
break
}

cols := make([]string, 0)
Expand Down Expand Up @@ -104,14 +105,20 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err

jsonData, err := json.Marshal(b)
if err != nil {
return fmt.Errorf("could not create a json obj for a block: %w", err)
return nil, fmt.Errorf("could not create a json obj for a block: %w", err)
}
_, err = blockWriter.WriteString(string(jsonData) + "\n")
if err != nil {
return fmt.Errorf("could not write block json to the file: %w", err)
return nil, fmt.Errorf("could not write block json to the file: %w", err)
}
blockWriter.Flush()

activeBlockID = header.ParentID
}

state, err := commits.ByBlockID(blockID)
if err != nil {
return nil, fmt.Errorf("could not find state commitment for this block: %w", err)
}
return state, nil
}
19 changes: 15 additions & 4 deletions cmd/util/cmd/exec-data-json-export/cmd.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package jsonexporter

import (
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"encoding/hex"

"github.com/onflow/flow-go/model/flow"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)

var (
flagExecutionStateDir string
flagOutputDir string
flagBlockHash string
flagDatadir string
flagStateCommitment string
)

var Cmd = &cobra.Command{
Expand All @@ -36,6 +38,9 @@ func init() {
Cmd.Flags().StringVar(&flagDatadir, "datadir", "",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagStateCommitment, "state-commitment", "",
"state commitment (hex-encoded, 64 characters)")
}

func run(*cobra.Command, []string) {
Expand All @@ -46,7 +51,7 @@ func run(*cobra.Command, []string) {
}

log.Info().Msg("start exporting blocks")
err = ExportBlocks(blockID, flagDatadir, flagOutputDir)
fallbackState, err := ExportBlocks(blockID, flagDatadir, flagOutputDir)
if err != nil {
log.Fatal().Err(err).Msg("cannot get export blocks")
}
Expand All @@ -70,7 +75,13 @@ func run(*cobra.Command, []string) {
}

log.Info().Msg("start exporting ledger")
err = ExportLedger(blockID, flagDatadir, flagExecutionStateDir, flagOutputDir)
// if state commitment not provided do the fall back to the one connected to the block
if len(flagStateCommitment) == 0 {
flagStateCommitment = hex.EncodeToString(fallbackState)
log.Info().Msg("no state commitment is provided, falling back to the one attached to the block")
}

err = ExportLedger(flagExecutionStateDir, flagStateCommitment, flagOutputDir)
if err != nil {
log.Fatal().Err(err).Msg("cannot get export ledger")
}
Expand Down
119 changes: 8 additions & 111 deletions cmd/util/cmd/exec-data-json-export/ledger_exporter.go
Original file line number Diff line number Diff line change
@@ -1,133 +1,30 @@
package jsonexporter

import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"fmt"
"os"
"path/filepath"
"time"

"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage/badger"
)

// ExportLedger exports ledger key value pairs at the given blockID
func ExportLedger(blockID flow.Identifier, dbPath string, ledgerPath string, outputPath string) error {
db := common.InitStorage(dbPath)
defer db.Close()
func ExportLedger(ledgerPath string, targetstate string, outputPath string) error {

cache := &metrics.NoopCollector{}
commits := badger.NewCommits(cache, db)

targetHash, err := commits.ByBlockID(blockID)
led, err := complete.NewLedger(ledgerPath, complete.DefaultCacheSize, &metrics.NoopCollector{}, log.Logger, nil, 0)
if err != nil {
return fmt.Errorf("cannot get state commitment for block: %w", err)
return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err)
}

w, err := wal.NewWAL(nil, nil, ledgerPath, complete.DefaultCacheSize, pathfinder.PathByteSize, wal.SegmentSize)
state, err := hex.DecodeString(targetstate)
if err != nil {
return fmt.Errorf("cannot create WAL: %w", err)
return fmt.Errorf("failed to decode hex code of state: %w", err)
}
defer func() {
_ = w.Close()
}()

// TODO port this to use new forest
forest, err := mtrie.NewForest(pathfinder.PathByteSize, outputPath, complete.DefaultCacheSize, &metrics.NoopCollector{}, nil)
err = led.DumpTrieAsJSON(ledger.State(state), outputPath)
if err != nil {
return fmt.Errorf("cannot create mForest: %w", err)
return fmt.Errorf("cannot dump trie as json: %w", err)
}

i := 0
valuesSize := 0
valuesCount := 0
startTime := time.Now()
found := false
FoundHashError := fmt.Errorf("found hash %s", targetHash)

err = w.ReplayLogsOnly(
func(forestSequencing *flattener.FlattenedForest) error {
rebuiltTries, err := flattener.RebuildTries(forestSequencing)
if err != nil {
return fmt.Errorf("rebuilding forest from sequenced nodes failed: %w", err)
}
err = forest.AddTries(rebuiltTries)
if err != nil {
return fmt.Errorf("adding rebuilt tries to forest failed: %w", err)
}
return nil
},
func(update *ledger.TrieUpdate) error {

newTrieHash, err := forest.Update(update)

for _, value := range update.Payloads {
valuesSize += len(value.Value)
}

valuesCount += len(update.Payloads)

if err != nil {
return fmt.Errorf("error while updating mForest: %w", err)
}

if bytes.Equal(targetHash, newTrieHash) {
found = true
return FoundHashError
}

i++
if i%1000 == 0 {
log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Msg("progress")
}

return err
},
func(commitment ledger.RootHash) error {
return nil
})

duration := time.Since(startTime)

if !errors.Is(err, FoundHashError) {
return fmt.Errorf("error while processing WAL: %w", err)
}

if !found {
return fmt.Errorf("no value found: %w", err)
}

log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Float64("total_time_s", duration.Seconds()).Msg("finished seeking")
log.Info().Msg("writing root checkpoint")

trie, err := forest.GetTrie(targetHash)
if err != nil {
return fmt.Errorf("cannot get a trie with target hash: %w", err)
}

path := filepath.Join(outputPath, hex.EncodeToString(targetHash)+".trie.jsonl")

fi, err := os.Create(path)
if err != nil {
return err
}
defer fi.Close()

writer := bufio.NewWriter(fi)
defer writer.Flush()

return trie.DumpAsJSON(writer)
return nil
}
26 changes: 26 additions & 0 deletions ledger/complete/ledger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package complete

import (
"bufio"
"encoding/hex"
"fmt"
"os"
"path/filepath"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -336,3 +340,25 @@ func (l *Ledger) ExportCheckpointAt(state ledger.State,

return newTrie.RootHash(), nil
}

// DumpTrieAsJSON export trie at specific state as a jsonl file, each line is json encode of a payload
func (l *Ledger) DumpTrieAsJSON(state ledger.State, outputFilePath string) error {
fmt.Println(ledger.RootHash(state))
trie, err := l.forest.GetTrie(ledger.RootHash(state))
if err != nil {
return fmt.Errorf("cannot find the target trie: %w", err)
}

path := filepath.Join(outputFilePath, hex.EncodeToString(ledger.RootHash(state))+".trie.jsonl")

fi, err := os.Create(path)
if err != nil {
return err
}
defer fi.Close()

writer := bufio.NewWriter(fi)
defer writer.Flush()

return trie.DumpAsJSON(writer)
}