Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"archive/tar"
"bufio"
"compress/gzip"
"context"
"database/sql"
"encoding/base64"
Expand All @@ -43,12 +44,12 @@ import (
"github.com/algorand/go-algorand/util/db"
)

var tarFile string
var catchpointFile string
var outFileName string
var excludedFields *cmdutil.CobraStringSliceValue = cmdutil.MakeCobraStringSliceValue(nil, []string{"version", "catchpoint"})

func init() {
fileCmd.Flags().StringVarP(&tarFile, "tar", "t", "", "Specify the tar file to process")
fileCmd.Flags().StringVarP(&catchpointFile, "tar", "t", "", "Specify the catchpoint file (either .tar or .tar.gz) to process")
fileCmd.Flags().StringVarP(&outFileName, "output", "o", "", "Specify an outfile for the dump ( i.e. tracker.dump.txt )")
fileCmd.Flags().BoolVarP(&loadOnly, "load", "l", false, "Load only, do not dump")
fileCmd.Flags().VarP(excludedFields, "exclude-fields", "e", "List of fields to exclude from the dump: ["+excludedFields.AllowedString()+"]")
Expand All @@ -60,18 +61,18 @@ var fileCmd = &cobra.Command{
Long: "Specify a file to dump",
Args: validateNoPosArgsFn,
Run: func(cmd *cobra.Command, args []string) {
if tarFile == "" {
if catchpointFile == "" {
cmd.HelpFunc()(cmd, args)
return
}
stats, err := os.Stat(tarFile)
stats, err := os.Stat(catchpointFile)
if err != nil {
reportErrorf("Unable to stat '%s' : %v", tarFile, err)
reportErrorf("Unable to stat '%s' : %v", catchpointFile, err)
}

tarSize := stats.Size()
if tarSize == 0 {
reportErrorf("Empty file '%s' : %v", tarFile, err)
catchpointSize := stats.Size()
if catchpointSize == 0 {
reportErrorf("Empty file '%s' : %v", catchpointFile, err)
}
// TODO: store CurrentProtocol in catchpoint file header.
// As a temporary workaround use a current protocol version.
Expand Down Expand Up @@ -105,13 +106,13 @@ var fileCmd = &cobra.Command{
}
var fileHeader ledger.CatchpointFileHeader

reader, err := os.Open(tarFile)
reader, err := os.Open(catchpointFile)
if err != nil {
reportErrorf("Unable to read '%s' : %v", tarFile, err)
reportErrorf("Unable to read '%s' : %v", catchpointFile, err)
}
defer reader.Close()

fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize)
fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, catchpointSize)
if err != nil {
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
}
Expand Down Expand Up @@ -148,14 +149,49 @@ func printLoadCatchpointProgressLine(progress int, barLength int, dld int64) {
fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
}

func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, tarFile io.Reader, tarSize int64) (fileHeader ledger.CatchpointFileHeader, err error) {
func isGzipCompressed(catchpointReader *bufio.Reader, catchpointFileSize int64) bool {
const gzipPrefixSize = 2
const gzipPrefix = "\x1F\x8B"

if catchpointFileSize < gzipPrefixSize {
return false
}

prefixBytes, err := catchpointReader.Peek(gzipPrefixSize)

if err != nil {
return false
}

return prefixBytes[0] == gzipPrefix[0] && prefixBytes[1] == gzipPrefix[1]
}

func getCatchpointTarReader(catchpointReader *bufio.Reader, catchpointFileSize int64) (*tar.Reader, error) {
if isGzipCompressed(catchpointReader, catchpointFileSize) {
gzipReader, err := gzip.NewReader(catchpointReader)
if err != nil {
return nil, err
}

return tar.NewReader(gzipReader), nil
}

return tar.NewReader(catchpointReader), nil
}

func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, catchpointFile io.Reader, catchpointFileSize int64) (fileHeader ledger.CatchpointFileHeader, err error) {
fmt.Printf("\n")
printLoadCatchpointProgressLine(0, 50, 0)
lastProgressUpdate := time.Now()
progress := uint64(0)
defer printLoadCatchpointProgressLine(0, 0, 0)

tarReader := tar.NewReader(tarFile)
catchpointReader := bufio.NewReader(catchpointFile)
tarReader, err := getCatchpointTarReader(catchpointReader, catchpointFileSize)
if err != nil {
return fileHeader, err
}

var downloadProgress ledger.CatchpointCatchupAccessorProgress
for {
header, err := tarReader.Next()
Expand Down Expand Up @@ -190,9 +226,9 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
// we already know it's valid, since we validated that above.
protocol.Decode(balancesBlockBytes, &fileHeader)
}
if time.Since(lastProgressUpdate) > 50*time.Millisecond && tarSize > 0 {
if time.Since(lastProgressUpdate) > 50*time.Millisecond && catchpointFileSize > 0 {
lastProgressUpdate = time.Now()
printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(tarSize)), 50, int64(progress))
printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(catchpointFileSize)), 50, int64(progress))
}
}
}
Expand Down