From f66f50c345c22c358ad93bcc5b9fe9087c6e01f9 Mon Sep 17 00:00:00 2001 From: Derek Leung Date: Thu, 13 Jun 2019 14:15:11 -0400 Subject: [PATCH 1/2] Support streaming in the coroner debug tool. This commit allows streaming cadaver files into the coroner debug tool. This improves the memory footprint of processing of large cadaver files, and it also allows a user or other tools to inspect the output of coroner as quickly as it is processed. This commit removes support for relative round bounds for trimming coroner, since end-relative round bounds are hard to compute while streaming. --- agreement/actor.go | 4 +- agreement/autopsy.go | 540 +++++++++++++++++++++++------------------- agreement/trace.go | 25 +- debug/coroner/main.go | 128 +++------- 4 files changed, 353 insertions(+), 344 deletions(-) diff --git a/agreement/actor.go b/agreement/actor.go index 195edb4d29..20f066c9da 100644 --- a/agreement/actor.go +++ b/agreement/actor.go @@ -135,11 +135,11 @@ type ioLoggedActor struct { func (l ioLoggedActor) handle(h routerHandle, e event) []action { if l.tracer.level >= top { - fmt.Printf("%23v => %23v: %v\n", "", l.T(), e) + fmt.Fprintf(l.tracer.w, "%23v => %23v: %v\n", "", l.T(), e) } a := l.checkedActor.handle(h, e) if l.tracer.level >= top { - fmt.Printf("%23v <= %23v: %v\n", "", l.T(), a) + fmt.Fprintf(l.tracer.w, "%23v <= %23v: %v\n", "", l.T(), a) } return a } diff --git a/agreement/autopsy.go b/agreement/autopsy.go index 81a3746130..00a2be71f8 100644 --- a/agreement/autopsy.go +++ b/agreement/autopsy.go @@ -21,6 +21,7 @@ import ( "io" "os" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" ) @@ -33,44 +34,42 @@ import ( type Autopsy struct { io.Reader io.Closer -} -// PrepareAutopsyFromInputStream prepares an autopsy from std in. -func PrepareAutopsyFromInputStream() (*Autopsy, error) { - a := new(Autopsy) - a.Reader = os.Stdin - a.Closer = os.Stdin - return a, nil + cdvs <-chan cdvInstance } -type multiCloser struct { - closers []io.Closer +// AutopsyBounds defines the range of rounds and periods spanned by a single +// invocation of a cadaver-generating process. +type AutopsyBounds struct { + // Start and End are inclusive here. + StartRound uint64 + StartPeriod uint64 + EndRound uint64 + EndPeriod uint64 } -func (m *multiCloser) Close() error { - for _, c := range m.closers { - err := c.Close() - if err != nil { - return err - } - } - return nil -} - -// MultiCloser returns a Closer that closes all the given closers. -func MultiCloser(closers ...io.Closer) io.Closer { - r := make([]io.Closer, len(closers)) - copy(r, closers) - return &multiCloser{r} +// PrepareAutopsyFromStream prepares an autopsy from a given ReadCloser. +// +// nextBounds is called with a sequence number for each new invocation of a +// cadaver-generating process (a "run"). +// +// done is called with the total number of runs and any error encountered while +// performing the autopsy. +func PrepareAutopsyFromStream(stream io.ReadCloser, nextBounds func(int, AutopsyBounds), done func(int, error)) (*Autopsy, error) { + return prepareStreamingAutopsy(stream, stream, nextBounds, done), nil } // PrepareAutopsy prepares an autopsy from a cadaver filename. -func PrepareAutopsy(cadaverBaseFilename string) (*Autopsy, error) { +// +// nextBounds is called with a sequence number for each new invocation of a +// cadaver-generating process (a "run"). +// +// done is called with the total number of runs and any error encountered while +// performing the autopsy. +func PrepareAutopsy(cadaverBaseFilename string, nextBounds func(int, AutopsyBounds), done func(int, error)) (*Autopsy, error) { name0 := cadaverBaseFilename + ".archive" // read the archive file first name1 := cadaverBaseFilename - a := new(Autopsy) - in1, err := os.Open(name1) if err != nil { return nil, err @@ -79,272 +78,168 @@ func PrepareAutopsy(cadaverBaseFilename string) (*Autopsy, error) { if err != nil { if os.IsNotExist(err) { // only one file created - a.Reader = in1 - a.Closer = in1 - return a, nil + return prepareStreamingAutopsy(in1, in1, nextBounds, done), nil } return nil, err } - a.Reader = io.MultiReader(in0, in1) - a.Closer = MultiCloser(in0, in1) - return a, nil -} -// ExtractCdvs returns all the autopsied cadaver sequences contained in an autopsy. -func (a *Autopsy) ExtractCdvs() (seqs []AutopsiedCdv, reterr error) { - for { - s, _, err := a.ExtractNextCdv(nil) - if err != nil { - reterr = err - return - } - if s.Empty() { - return - } - seqs = append(seqs, s) - } + return prepareStreamingAutopsy(io.MultiReader(in0, in1), makeMultiCloser(in0, in1), nextBounds, done), nil } -// ExtractNextCdv extracts the next AutopsiedCdv from an Autopsy and calls the -// given callback every time it extracts a single AutopsyTrace. -// -// headSkipped indicates how many events were skipped from the head of -// the cadaver. -// -// AutopsiedCdv may be partial - that is, it may not have a metadata entry, esp. if the archive -// file was too big and overwritten. -// -// traces may be set if reterr != nil. -func (a *Autopsy) ExtractNextCdv(h func(AutopsyTrace) (bool, error)) (aCdv AutopsiedCdv, headSkipped int, reterr error) { - recording := false - var acc AutopsyTrace - var accs AutopsyTraceSeq - - var err error - defer func() { - if recording { - accs = append(accs, acc) - if h != nil { - _, reterr = h(acc) - } - } - if err != nil && err.Error() == "EOF" { - reterr = nil - } - aCdv.T = accs - }() +type multiCloser struct { + closers []io.Closer +} - for { // terminates automatically on EOF - var t cadaverEntryType - err = protocol.DecodeStream(a, &t) +func (m *multiCloser) Close() error { + for _, c := range m.closers { + err := c.Close() if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode cadaverEntryType: %v", err) - return + return err } + } + return nil +} - switch t { - case cadaverEOSEntry: - // if cadaver sequence, terminate. This indicates a crash, or any new cadaver process. - // else, no-op. - if recording { - return - } - case cadaverPlayerEntry: - if recording { - if len(acc.e) != len(acc.a) && len(acc.e) != len(acc.a)+1 { // last event may have resulted in a process failure - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: events do not align with actions: %d != %d (+1)", len(acc.e), len(acc.a)) - return - } - accs = append(accs, acc) - if h != nil { - keepGoing, err := h(acc) - if err != nil || !keepGoing { - reterr = err - return - } - } - } - recording = true +// makeMultiCloser returns a Closer that closes all the given closers. +func makeMultiCloser(closers ...io.Closer) io.Closer { + r := make([]io.Closer, len(closers)) + copy(r, closers) + return &multiCloser{r} +} - acc = AutopsyTrace{} - err = protocol.DecodeStream(a, &acc.x) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode player: %v", err) - return - } +type autopsyTrace struct { + x player + m CadaverMetadata - if len(accs) == 0 { - aCdv.StartRound = int64(acc.x.Round) - aCdv.StartPeriod = int64(acc.x.Period) - } - aCdv.EndRound = int64(acc.x.Round) - aCdv.EndPeriod = int64(acc.x.Period) - case cadaverEventEntry: - var et eventType - err = protocol.DecodeStream(a, &et) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode eventType: %v", err) - return - } + p <-chan autopsyPair +} - e := zeroEvent(et) - err = protocol.DecodeStream(a, &e) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode event: %v", err) - return - } +type cdvInstance <-chan autopsyTrace - if recording { - acc.e = append(acc.e, e) - } else { - headSkipped++ - } +func prepareStreamingAutopsy(r io.Reader, c io.Closer, nextBounds func(int, AutopsyBounds), done func(int, error)) *Autopsy { + a := new(Autopsy) + a.Reader = r + a.Closer = c - case cadaverActionEntry: - var n int - err = protocol.DecodeStream(a, &n) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode number of actions: %v", err) - } + ch := make(chan cdvInstance) + go func() { + defer func() { + close(ch) + }() - var as []action - for i := 0; i < n; i++ { - var at actionType - err = protocol.DecodeStream(a, &at) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode actionType: %v", err) - return - } + for n := 0; ; n++ { + tch := make(chan autopsyTrace) + ch <- tch - zA := zeroAction(at) - err = protocol.DecodeStream(a, &zA) - if err != nil { - fmt.Printf("Action type: %v\n", at.String()) - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode action: %v", err) - return - } + err, empty, bounds := a.extractNextCdv(tch) - as = append(as, zA) + if !empty { + nextBounds(n, bounds) } - if recording { - acc.a = append(acc.a, as) - } // headSkipped is accounted for already - - case cadaverMetaEntry: - // note that we can read multiple of these for a singe "cadaver seq" during normal operation if a sequence spans multiple - // files (due to fileTargetSize); the latest one gets printed (for now). - err = protocol.DecodeStream(a, &aCdv.M) if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode meta entry sequence number: %v", err) + close(tch) + done(n, err) + return + } + if empty { + close(tch) + done(n, nil) return } } - } + }() + a.cdvs = ch + return a } -// An AutopsyTrace is the explict trace extracted from a cadaver -// file for a single (round, period) pair. -type AutopsyTrace struct { - x player - e []event - a [][]action +type switchableWriter struct { + io.Writer + disabled bool } -// Dump is another convenience function for live streaming autopsy -func (a AutopsyTrace) Dump() { - fmt.Printf("autopsy: player state is %+v ", a.x) - - for i := range a.e { - fmt.Printf("e: %v\n", a.e[i]) - fmt.Printf("actual: %v\n", a.a[i]) - } +func (w *switchableWriter) Enable() { + w.disabled = false } -// An AutopsyTraceSeq is a slice of traces extracted from the autopsy. -type AutopsyTraceSeq []AutopsyTrace - -// An AutopsiedCdv is an ordered slice of AutopsyTraces, corresponding to -// one contiguous cadaver log sequence (e.g. before crashing) -type AutopsiedCdv struct { - T AutopsyTraceSeq - M CadaverMetadata - StartRound int64 - StartPeriod int64 - EndRound int64 - EndPeriod int64 +func (w *switchableWriter) Disable() { + w.disabled = true } -// Empty returns true if AutopsiedCdv is empty (e.g. read from empty autopsy file) -func (seq AutopsiedCdv) Empty() bool { - return seq.T == nil +func (w switchableWriter) Write(p []byte) (n int, err error) { + if w.disabled { + return len(p), nil + } + return w.Writer.Write(p) } -// FilterBefore removes all traces smaller than the given round. Returns -// trimmed sequence and the first round of the first trace in the trimmed sequence. -func (seq AutopsyTraceSeq) FilterBefore(first int64) (AutopsyTraceSeq, int64) { - var nextFirstRound int64 - for i := range seq { - nextFirstRound = int64(seq[i].x.Round) - if nextFirstRound >= first { - // we want to keep seq[i] - return seq[i:], nextFirstRound - } - } - return nil, nextFirstRound +type switchableWriteCloser struct { + switchableWriter + io.Closer } -// FilterAfter removes all traces larger than the given round. Returns -// a trimmed seq and the last round of the last trace in the trimmed seq. -func (seq AutopsyTraceSeq) FilterAfter(last int64) (AutopsyTraceSeq, int64) { - var prevLastRound int64 - for i := range seq { - if int64(seq[i].x.Round) > last { - // discard seq[i] and everything after; return round of seq[i-1] - return seq[:i], prevLastRound - } - prevLastRound = int64(seq[i].x.Round) - } - // nothing to discard... - return seq[:], prevLastRound +// AutopsyFilter represents a window of rounds to be filtered from the autopsy +// output. +type AutopsyFilter struct { + Enabled bool // do not filter if this is false + First basics.Round // first round to emit output for; inclusive + Last basics.Round // last round to emit output for; inclusive } // DumpString dumps a textual representation of the AutopsyCdvs to the // given io.Writer. -func DumpString(cdvs []AutopsiedCdv, w io.Writer) { +func (a *Autopsy) DumpString(filter AutopsyFilter, w0 io.Writer) (version string) { + w := &switchableWriter{Writer: w0} var playerTracer tracer playerTracer.level = all playerTracer.log = serviceLogger{logging.Base()} + playerTracer.w = w var router rootRouter // TODO this could become inaccurate with orphaned events - for _, aCdv := range cdvs { - fmt.Fprintf(w, "autopsy: metadata: %v >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", aCdv.M) + for cdv := range a.cdvs { + first := true + + for tr := range cdv { + if first { + first = false + fmt.Fprintf(w, "autopsy: metadata: %v >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", tr.m) + version = tr.m.VersionCommitHash + } - for _, tr := range aCdv.T { player := tr.x + + if filter.Enabled { + if player.Round < filter.First || player.Round > filter.Last { + w.Disable() + } else { + w.Enable() + } + } + fmt.Fprintln(w, "autopsy:-") fmt.Fprintln(w, "autopsy:===================================") - DumpPlayerStr(w, player, router, "actual") + dumpPlayerStr(w, player, router, "actual") var p actor = ioLoggedActor{checkedActor{actor: &player, actorContract: playerContract{}}, playerTracer} router.root = p - for i, e := range tr.e { - player, _ = router.submitTop(&playerTracer, player, e) - if i == len(tr.a) { + for pair := range tr.p { + player, _ = router.submitTop(&playerTracer, player, pair.e) + if !pair.aok { break } - fmt.Fprintf(w, "actual: %v\n", tr.a[i]) + fmt.Fprintf(w, "actual: %v\n", pair.a) fmt.Fprintln(w, "autopsy:===================================") - DumpPlayerStr(w, player, router, "predicted") + dumpPlayerStr(w, player, router, "predicted") } } } + return } -// DumpPlayerStr prints useful state of the player, tagging the output with string -func DumpPlayerStr(w io.Writer, p player, r rootRouter, tag string) { +// dumpPlayerStr prints useful state of the player, tagging the output with string. +func dumpPlayerStr(w io.Writer, p player, r rootRouter, tag string) { playerCopy := p playerCopy.Pending = proposalTable{} fmt.Fprintf(w, "autopsy: (%s) player state is %+v (len(player.Pending = %d))\n", tag, playerCopy, len(p.Pending.Pending)) @@ -392,34 +287,197 @@ func DumpPlayerStr(w io.Writer, p player, r rootRouter, tag string) { // DumpMessagePack dumps a msgpack representation of the AutopsiedCdvs to the // given io.Writer. -func DumpMessagePack(cdvs []AutopsiedCdv, w io.WriteCloser) { +func (a *Autopsy) DumpMessagePack(filter AutopsyFilter, w0 io.WriteCloser) (version string) { + w := &switchableWriteCloser{switchableWriter: switchableWriter{Writer: w0}, Closer: w0} var playerTracer tracer playerTracer.log = serviceLogger{logging.Base()} + playerTracer.w = w var router rootRouter // TODO this could become inaccurate with orphaned events - for _, aCdv := range cdvs { + for cdv := range a.cdvs { + first := true + // reset cadaver for every cdv seq (so we don't miss caching player state) c := cadaver{} c.overrideSetup = true c.out = &cadaverHandle{WriteCloser: w} - protocol.EncodeStream(c.out, cadaverMetaEntry) - protocol.EncodeStream(c.out, aCdv.M) + for tr := range cdv { + if first { + first = false + protocol.EncodeStream(c.out, cadaverMetaEntry) + protocol.EncodeStream(c.out, tr.m) + version = tr.m.VersionCommitHash + } - for _, tr := range aCdv.T { player := tr.x var p actor = checkedActor{actor: &player, actorContract: playerContract{}} router.root = p - for i, e := range tr.e { - c.traceInput(player.Round, player.Period, player, tr.e[i]) - if i < len(tr.a) { - c.traceOutput(player.Round, player.Period, player, tr.a[i]) + if filter.Enabled { + if player.Round < filter.First || player.Round > filter.Last { + w.Disable() + } else { + w.Enable() + } + } + + for pair := range tr.p { + c.traceInput(player.Round, player.Period, player, pair.e) + if pair.aok { + c.traceOutput(player.Round, player.Period, player, pair.a) } - player, _ = router.submitTop(&playerTracer, player, e) + player, _ = router.submitTop(&playerTracer, player, pair.e) // TODO can check correspondence here } } protocol.EncodeStream(c.out, cadaverEOSEntry) } + return +} + +type autopsyPair struct { + e event + a []action + aok bool +} + +func (a *Autopsy) extractNextCdv(ch chan<- autopsyTrace) (reterr error, empty bool, bounds AutopsyBounds) { + empty = true + + recording := false + var acc autopsyTrace + + var pch chan autopsyPair + var err error + defer func() { + if recording { + empty = false + close(pch) + close(ch) + } + if err != nil && err.Error() == "EOF" { + reterr = nil + } + }() + + expectAction := false // if false, event is expected; else action + var accp autopsyPair + + for { // terminates automatically on EOF + var t cadaverEntryType + err = protocol.DecodeStream(a, &t) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode cadaverEntryType: %v", err) + return + } + + switch t { + case cadaverEOSEntry: + // if cadaver sequence, terminate. This indicates a crash, or any new cadaver process. + // else, no-op. + if recording { + return + } + case cadaverPlayerEntry: + if recording { + empty = false + close(pch) + } + + pch = make(chan autopsyPair, 0) + acc = autopsyTrace{m: acc.m, p: pch} + err = protocol.DecodeStream(a, &acc.x) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode player: %v", err) + return + } + expectAction = false + + bounds.EndRound = uint64(acc.x.Round) + bounds.EndPeriod = uint64(acc.x.Period) + + if !recording { + // first time + bounds.StartRound = uint64(acc.x.Round) + bounds.StartPeriod = uint64(acc.x.Period) + } + recording = true + + ch <- acc + + case cadaverEventEntry: + var et eventType + err = protocol.DecodeStream(a, &et) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode eventType: %v", err) + return + } + + e := zeroEvent(et) + err = protocol.DecodeStream(a, &e) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode event: %v", err) + return + } + + if recording { + if expectAction { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: expected action but got event") + return + } + accp.e = e + expectAction = !expectAction + } + + case cadaverActionEntry: + var n int + err = protocol.DecodeStream(a, &n) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode number of actions: %v", err) + } + + var as []action + for i := 0; i < n; i++ { + var at actionType + err = protocol.DecodeStream(a, &at) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode actionType: %v", err) + return + } + + zA := zeroAction(at) + err = protocol.DecodeStream(a, &zA) + if err != nil { + fmt.Printf("Action type: %v\n", at.String()) + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode action: %v", err) + return + } + + as = append(as, zA) + } + + if recording { + if !expectAction { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: expected event but got action") + return + } + accp.aok = true + accp.a = as + pch <- accp + + accp = autopsyPair{} + expectAction = !expectAction + } + + case cadaverMetaEntry: + // note that we can read multiple of these for a singe "cadaver seq" during normal operation if a sequence spans multiple + // files (due to fileTargetSize); the latest one gets printed (for now). + err = protocol.DecodeStream(a, &acc.m) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode meta entry sequence number: %v", err) + return + } + } + } } diff --git a/agreement/trace.go b/agreement/trace.go index f570b9cab8..92a1cd7d51 100644 --- a/agreement/trace.go +++ b/agreement/trace.go @@ -18,6 +18,8 @@ package agreement import ( "fmt" + "io" + "os" "strings" "time" @@ -51,6 +53,8 @@ type tracer struct { log serviceLogger + w io.Writer + // Tracer is now a little stateful (for ad-hoc logging) // parent state machines/routers are responsible for making sure tracer // picks up the right state. Optional. @@ -74,6 +78,7 @@ func makeTracer(log serviceLogger, cadaverFilename string, cadaverSizeTarget uin t.log = log t.verboseReports = verboseReportFlag t.timingReports = timingReportFlag + t.w = os.Stdout fileSizeTarget := int64(cadaverSizeTarget) if fileSizeTarget == 0 { @@ -126,21 +131,21 @@ func (t *tracer) setMetadata(metadata tracerMetadata) { func (t *tracer) ein(src, dest stateMachineTag, e event, r round, p period, s step) { t.seq++ if t.level >= all { - // fmt.Printf("%v %3v %23v -> %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v -> %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v -> %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v -> %23v: %30v\n", t.tag, src, dest, e) } } func (t *tracer) eout(src, dest stateMachineTag, e event, r round, p period, s step) { t.seq++ if t.level >= all { - // fmt.Printf("%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) } else if t.level >= key { switch e.t() { case proposalAccepted, proposalCommittable, softThreshold, certThreshold, nextThreshold: - // fmt.Printf("%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) } } } @@ -148,8 +153,8 @@ func (t *tracer) eout(src, dest stateMachineTag, e event, r round, p period, s s func (t *tracer) ainTop(src, dest stateMachineTag, state player, e event, r round, p period, s step) { t.seq++ if t.level >= top { - // fmt.Printf("%v %3v %23v => %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v => %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v => %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v => %23v: %30v\n", t.tag, src, dest, e) } } @@ -165,8 +170,8 @@ func (t *tracer) aoutTop(src, dest stateMachineTag, as []action, r round, p peri t.seq++ if t.level >= top { - // fmt.Printf("%v %3v %23v <= %23v: %.30v\n", t.tag, t.seq, src, dest, as) - fmt.Printf("%v] %23v <= %23v: %.30v\n", t.tag, src, dest, as) + // fmt.Fprintf(t.w, "%v %3v %23v <= %23v: %.30v\n", t.tag, t.seq, src, dest, as) + fmt.Fprintf(t.w, "%v] %23v <= %23v: %.30v\n", t.tag, src, dest, as) } } diff --git a/debug/coroner/main.go b/debug/coroner/main.go index 44b6b6d9c4..91db4f25d8 100644 --- a/debug/coroner/main.go +++ b/debug/coroner/main.go @@ -26,53 +26,45 @@ import ( "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" ) var numRegex = regexp.MustCompile(`^\d+$`) -var posOffRegex = regexp.MustCompile(`^\+\d+$`) -var negOffRegex = regexp.MustCompile(`^\-\d+$`) -var filename = flag.String("file", "", "Name of the input cadaver file") +var filename = flag.String("file", "", "Name of the input cadaver file (otherwise, use stdin)") var versionCheck = flag.Bool("version", false, "Display current coroner build version and exit") - var printmsgpack = flag.Bool("msgpack", false, "If provided, emit msgpack instead of a string") -// note: these also take relative offsets given by "+" or "-" symbols -// e.g., the command -// coroner --skip-head -10 -// will give the last 10 rounds of the coroner. -// If relative is set, the removal is done relative to the minimum round in the -// trace if the given round is nonnegative. Otherwise, the removal is relative -// to the maximum round in the trace. var skipHead = flag.String("skip-head", "", "The first round to trim before") var skipTail = flag.String("skip-tail", "", "The last round to trim after") -func mustParse(data []byte) int64 { - x, err := strconv.ParseInt(string(data), 10, 64) +func mustParse(data []byte) uint64 { + x, err := strconv.ParseUint(string(data), 10, 64) if err != nil { log.Fatalf(`failed to parse round bound in "%s": %s`, string(data), err) } return x } -func parseRoundBound(s string) (bound int64, relative bool) { - data := []byte(s) - signfact := int64(1) - - switch { - case s == "": - case numRegex.Match(data): - bound = mustParse(numRegex.Find(data)) - case negOffRegex.Match(data): - signfact = -1 - fallthrough - case posOffRegex.Match(data): - relative = true - bound = mustParse(numRegex.Find(data[1:])) * signfact - default: - log.Fatalf(`failed to parse round bound in "%s": string does not match regex "^(+|-)\d+$"`, s) +func parseRoundBound(s string) uint64 { + if !numRegex.Match([]byte(s)) { + log.Fatalf(`failed to parse round bound in "%s": string does not match regex "^\d+$"`, s) + } + return mustParse(numRegex.Find([]byte(s))) +} + +func done(n int, err error) { + if n == 0 { + log.Println("coroner: no cadavers autopsied") + } + + if err != nil { + log.Println("coroner: failed to extract full autopsy trace:", err) } - return +} + +func nextBounds(i int, bounds agreement.AutopsyBounds) { + log.Printf("cadaver seq: %d\tstart(r,p): (%d,%d)\tend(r,p): (%d,%d)\n", i, bounds.StartRound, bounds.StartPeriod, bounds.EndRound, bounds.EndPeriod) } func main() { @@ -89,80 +81,34 @@ func main() { if *filename == "" { log.Println("coroner: no filename provided; reading from stdin...") - autopsy, err = agreement.PrepareAutopsyFromInputStream() + autopsy, err = agreement.PrepareAutopsyFromStream(os.Stdin, nextBounds, done) } else { - autopsy, err = agreement.PrepareAutopsy(*filename) + autopsy, err = agreement.PrepareAutopsy(*filename, nextBounds, done) } if err != nil { log.Fatalln("coroner: failed to prepare autopsy:", err) } defer autopsy.Close() - headRound, headRelative := parseRoundBound(*skipHead) - tailRound, tailRelative := parseRoundBound(*skipTail) - - autopsiedCdvs, err := autopsy.ExtractCdvs() - if err != nil { - log.Println("coroner: failed to extract full autopsy trace:", err) - log.Println("coroner: continuing after error...") - } - - if len(autopsiedCdvs) < 1 { - log.Println("coroner: no cadavers autopsied") - return - } - - firstMeta := autopsiedCdvs[0].M - log.Printf("coroner: Cadaver file generated with commit hash:\n%s\n", firstMeta.VersionCommitHash) - if firstMeta.VersionCommitHash != version.GetCommitHash() { - log.Printf("coroner: Cadaver version mismatches coroner version:\n(%s (cadaver) != %s (coroner))\n", firstMeta.VersionCommitHash, version.GetCommitHash()) - } - - cachedStartRound := autopsiedCdvs[0].StartRound + var filter agreement.AutopsyFilter if *skipHead != "" { - numCdvs := len(autopsiedCdvs) - first := headRound - if headRelative { - if headRound >= 0 { - first = cachedStartRound + headRound - } else { - first = autopsiedCdvs[numCdvs-1].EndRound + headRound - } - } - for i := range autopsiedCdvs { - if autopsiedCdvs[i].EndRound < first { - autopsiedCdvs = autopsiedCdvs[i+1:] - break - } - } - autopsiedCdvs[0].T, autopsiedCdvs[0].StartRound = autopsiedCdvs[0].T.FilterBefore(first) + filter.Enabled = true + filter.First = basics.Round(parseRoundBound(*skipHead)) } if *skipTail != "" { - numCdvs := len(autopsiedCdvs) - last := tailRound - if tailRelative { - if tailRound >= 0 { - last = cachedStartRound + tailRound - } else { - last = autopsiedCdvs[numCdvs-1].EndRound + tailRound - } - } - for i := range autopsiedCdvs { - if autopsiedCdvs[i].StartRound > last { - autopsiedCdvs = autopsiedCdvs[:i] - break - } - } - end := len(autopsiedCdvs) - autopsiedCdvs[end].T, autopsiedCdvs[end].EndRound = autopsiedCdvs[end].T.FilterAfter(last) + filter.Enabled = true + filter.Last = basics.Round(parseRoundBound(*skipTail)) } - for i := range autopsiedCdvs { - log.Printf("Cadaver Seq: %d\tstart: %d\tend: %d\n", i, autopsiedCdvs[i].StartRound, autopsiedCdvs[i].EndRound) - } + var commitHash string if *printmsgpack { - agreement.DumpMessagePack(autopsiedCdvs, os.Stdout) + commitHash = autopsy.DumpMessagePack(filter, os.Stdout) } else { - agreement.DumpString(autopsiedCdvs, os.Stdout) + commitHash = autopsy.DumpString(filter, os.Stdout) } + if commitHash != version.GetCommitHash() { + log.Printf("coroner: cadaver version mismatches coroner version:\n(%s (cadaver) != %s (coroner))\n", commitHash, version.GetCommitHash()) + } + + return } From a99f6ba70990e5067122c964f953a98b640ab321 Mon Sep 17 00:00:00 2001 From: Derek Leung Date: Fri, 14 Jun 2019 15:58:50 -0400 Subject: [PATCH 2/2] make sanity --- agreement/autopsy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agreement/autopsy.go b/agreement/autopsy.go index 00a2be71f8..549a916729 100644 --- a/agreement/autopsy.go +++ b/agreement/autopsy.go @@ -131,7 +131,7 @@ func prepareStreamingAutopsy(r io.Reader, c io.Closer, nextBounds func(int, Auto tch := make(chan autopsyTrace) ch <- tch - err, empty, bounds := a.extractNextCdv(tch) + bounds, empty, err := a.extractNextCdv(tch) if !empty { nextBounds(n, bounds) @@ -342,7 +342,7 @@ type autopsyPair struct { aok bool } -func (a *Autopsy) extractNextCdv(ch chan<- autopsyTrace) (reterr error, empty bool, bounds AutopsyBounds) { +func (a *Autopsy) extractNextCdv(ch chan<- autopsyTrace) (bounds AutopsyBounds, empty bool, reterr error) { empty = true recording := false