diff --git a/agreement/actor.go b/agreement/actor.go index 195edb4d29..20f066c9da 100644 --- a/agreement/actor.go +++ b/agreement/actor.go @@ -135,11 +135,11 @@ type ioLoggedActor struct { func (l ioLoggedActor) handle(h routerHandle, e event) []action { if l.tracer.level >= top { - fmt.Printf("%23v => %23v: %v\n", "", l.T(), e) + fmt.Fprintf(l.tracer.w, "%23v => %23v: %v\n", "", l.T(), e) } a := l.checkedActor.handle(h, e) if l.tracer.level >= top { - fmt.Printf("%23v <= %23v: %v\n", "", l.T(), a) + fmt.Fprintf(l.tracer.w, "%23v <= %23v: %v\n", "", l.T(), a) } return a } diff --git a/agreement/autopsy.go b/agreement/autopsy.go index 81a3746130..549a916729 100644 --- a/agreement/autopsy.go +++ b/agreement/autopsy.go @@ -21,6 +21,7 @@ import ( "io" "os" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" ) @@ -33,44 +34,42 @@ import ( type Autopsy struct { io.Reader io.Closer -} -// PrepareAutopsyFromInputStream prepares an autopsy from std in. -func PrepareAutopsyFromInputStream() (*Autopsy, error) { - a := new(Autopsy) - a.Reader = os.Stdin - a.Closer = os.Stdin - return a, nil + cdvs <-chan cdvInstance } -type multiCloser struct { - closers []io.Closer +// AutopsyBounds defines the range of rounds and periods spanned by a single +// invocation of a cadaver-generating process. +type AutopsyBounds struct { + // Start and End are inclusive here. + StartRound uint64 + StartPeriod uint64 + EndRound uint64 + EndPeriod uint64 } -func (m *multiCloser) Close() error { - for _, c := range m.closers { - err := c.Close() - if err != nil { - return err - } - } - return nil -} - -// MultiCloser returns a Closer that closes all the given closers. -func MultiCloser(closers ...io.Closer) io.Closer { - r := make([]io.Closer, len(closers)) - copy(r, closers) - return &multiCloser{r} +// PrepareAutopsyFromStream prepares an autopsy from a given ReadCloser. +// +// nextBounds is called with a sequence number for each new invocation of a +// cadaver-generating process (a "run"). +// +// done is called with the total number of runs and any error encountered while +// performing the autopsy. +func PrepareAutopsyFromStream(stream io.ReadCloser, nextBounds func(int, AutopsyBounds), done func(int, error)) (*Autopsy, error) { + return prepareStreamingAutopsy(stream, stream, nextBounds, done), nil } // PrepareAutopsy prepares an autopsy from a cadaver filename. -func PrepareAutopsy(cadaverBaseFilename string) (*Autopsy, error) { +// +// nextBounds is called with a sequence number for each new invocation of a +// cadaver-generating process (a "run"). +// +// done is called with the total number of runs and any error encountered while +// performing the autopsy. +func PrepareAutopsy(cadaverBaseFilename string, nextBounds func(int, AutopsyBounds), done func(int, error)) (*Autopsy, error) { name0 := cadaverBaseFilename + ".archive" // read the archive file first name1 := cadaverBaseFilename - a := new(Autopsy) - in1, err := os.Open(name1) if err != nil { return nil, err @@ -79,272 +78,168 @@ func PrepareAutopsy(cadaverBaseFilename string) (*Autopsy, error) { if err != nil { if os.IsNotExist(err) { // only one file created - a.Reader = in1 - a.Closer = in1 - return a, nil + return prepareStreamingAutopsy(in1, in1, nextBounds, done), nil } return nil, err } - a.Reader = io.MultiReader(in0, in1) - a.Closer = MultiCloser(in0, in1) - return a, nil -} -// ExtractCdvs returns all the autopsied cadaver sequences contained in an autopsy. -func (a *Autopsy) ExtractCdvs() (seqs []AutopsiedCdv, reterr error) { - for { - s, _, err := a.ExtractNextCdv(nil) - if err != nil { - reterr = err - return - } - if s.Empty() { - return - } - seqs = append(seqs, s) - } + return prepareStreamingAutopsy(io.MultiReader(in0, in1), makeMultiCloser(in0, in1), nextBounds, done), nil } -// ExtractNextCdv extracts the next AutopsiedCdv from an Autopsy and calls the -// given callback every time it extracts a single AutopsyTrace. -// -// headSkipped indicates how many events were skipped from the head of -// the cadaver. -// -// AutopsiedCdv may be partial - that is, it may not have a metadata entry, esp. if the archive -// file was too big and overwritten. -// -// traces may be set if reterr != nil. -func (a *Autopsy) ExtractNextCdv(h func(AutopsyTrace) (bool, error)) (aCdv AutopsiedCdv, headSkipped int, reterr error) { - recording := false - var acc AutopsyTrace - var accs AutopsyTraceSeq - - var err error - defer func() { - if recording { - accs = append(accs, acc) - if h != nil { - _, reterr = h(acc) - } - } - if err != nil && err.Error() == "EOF" { - reterr = nil - } - aCdv.T = accs - }() +type multiCloser struct { + closers []io.Closer +} - for { // terminates automatically on EOF - var t cadaverEntryType - err = protocol.DecodeStream(a, &t) +func (m *multiCloser) Close() error { + for _, c := range m.closers { + err := c.Close() if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode cadaverEntryType: %v", err) - return + return err } + } + return nil +} - switch t { - case cadaverEOSEntry: - // if cadaver sequence, terminate. This indicates a crash, or any new cadaver process. - // else, no-op. - if recording { - return - } - case cadaverPlayerEntry: - if recording { - if len(acc.e) != len(acc.a) && len(acc.e) != len(acc.a)+1 { // last event may have resulted in a process failure - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: events do not align with actions: %d != %d (+1)", len(acc.e), len(acc.a)) - return - } - accs = append(accs, acc) - if h != nil { - keepGoing, err := h(acc) - if err != nil || !keepGoing { - reterr = err - return - } - } - } - recording = true +// makeMultiCloser returns a Closer that closes all the given closers. +func makeMultiCloser(closers ...io.Closer) io.Closer { + r := make([]io.Closer, len(closers)) + copy(r, closers) + return &multiCloser{r} +} - acc = AutopsyTrace{} - err = protocol.DecodeStream(a, &acc.x) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode player: %v", err) - return - } +type autopsyTrace struct { + x player + m CadaverMetadata - if len(accs) == 0 { - aCdv.StartRound = int64(acc.x.Round) - aCdv.StartPeriod = int64(acc.x.Period) - } - aCdv.EndRound = int64(acc.x.Round) - aCdv.EndPeriod = int64(acc.x.Period) - case cadaverEventEntry: - var et eventType - err = protocol.DecodeStream(a, &et) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode eventType: %v", err) - return - } + p <-chan autopsyPair +} - e := zeroEvent(et) - err = protocol.DecodeStream(a, &e) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode event: %v", err) - return - } +type cdvInstance <-chan autopsyTrace - if recording { - acc.e = append(acc.e, e) - } else { - headSkipped++ - } +func prepareStreamingAutopsy(r io.Reader, c io.Closer, nextBounds func(int, AutopsyBounds), done func(int, error)) *Autopsy { + a := new(Autopsy) + a.Reader = r + a.Closer = c - case cadaverActionEntry: - var n int - err = protocol.DecodeStream(a, &n) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode number of actions: %v", err) - } + ch := make(chan cdvInstance) + go func() { + defer func() { + close(ch) + }() - var as []action - for i := 0; i < n; i++ { - var at actionType - err = protocol.DecodeStream(a, &at) - if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode actionType: %v", err) - return - } + for n := 0; ; n++ { + tch := make(chan autopsyTrace) + ch <- tch - zA := zeroAction(at) - err = protocol.DecodeStream(a, &zA) - if err != nil { - fmt.Printf("Action type: %v\n", at.String()) - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode action: %v", err) - return - } + bounds, empty, err := a.extractNextCdv(tch) - as = append(as, zA) + if !empty { + nextBounds(n, bounds) } - if recording { - acc.a = append(acc.a, as) - } // headSkipped is accounted for already - - case cadaverMetaEntry: - // note that we can read multiple of these for a singe "cadaver seq" during normal operation if a sequence spans multiple - // files (due to fileTargetSize); the latest one gets printed (for now). - err = protocol.DecodeStream(a, &aCdv.M) if err != nil { - reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode meta entry sequence number: %v", err) + close(tch) + done(n, err) + return + } + if empty { + close(tch) + done(n, nil) return } } - } + }() + a.cdvs = ch + return a } -// An AutopsyTrace is the explict trace extracted from a cadaver -// file for a single (round, period) pair. -type AutopsyTrace struct { - x player - e []event - a [][]action +type switchableWriter struct { + io.Writer + disabled bool } -// Dump is another convenience function for live streaming autopsy -func (a AutopsyTrace) Dump() { - fmt.Printf("autopsy: player state is %+v ", a.x) - - for i := range a.e { - fmt.Printf("e: %v\n", a.e[i]) - fmt.Printf("actual: %v\n", a.a[i]) - } +func (w *switchableWriter) Enable() { + w.disabled = false } -// An AutopsyTraceSeq is a slice of traces extracted from the autopsy. -type AutopsyTraceSeq []AutopsyTrace - -// An AutopsiedCdv is an ordered slice of AutopsyTraces, corresponding to -// one contiguous cadaver log sequence (e.g. before crashing) -type AutopsiedCdv struct { - T AutopsyTraceSeq - M CadaverMetadata - StartRound int64 - StartPeriod int64 - EndRound int64 - EndPeriod int64 +func (w *switchableWriter) Disable() { + w.disabled = true } -// Empty returns true if AutopsiedCdv is empty (e.g. read from empty autopsy file) -func (seq AutopsiedCdv) Empty() bool { - return seq.T == nil +func (w switchableWriter) Write(p []byte) (n int, err error) { + if w.disabled { + return len(p), nil + } + return w.Writer.Write(p) } -// FilterBefore removes all traces smaller than the given round. Returns -// trimmed sequence and the first round of the first trace in the trimmed sequence. -func (seq AutopsyTraceSeq) FilterBefore(first int64) (AutopsyTraceSeq, int64) { - var nextFirstRound int64 - for i := range seq { - nextFirstRound = int64(seq[i].x.Round) - if nextFirstRound >= first { - // we want to keep seq[i] - return seq[i:], nextFirstRound - } - } - return nil, nextFirstRound +type switchableWriteCloser struct { + switchableWriter + io.Closer } -// FilterAfter removes all traces larger than the given round. Returns -// a trimmed seq and the last round of the last trace in the trimmed seq. -func (seq AutopsyTraceSeq) FilterAfter(last int64) (AutopsyTraceSeq, int64) { - var prevLastRound int64 - for i := range seq { - if int64(seq[i].x.Round) > last { - // discard seq[i] and everything after; return round of seq[i-1] - return seq[:i], prevLastRound - } - prevLastRound = int64(seq[i].x.Round) - } - // nothing to discard... - return seq[:], prevLastRound +// AutopsyFilter represents a window of rounds to be filtered from the autopsy +// output. +type AutopsyFilter struct { + Enabled bool // do not filter if this is false + First basics.Round // first round to emit output for; inclusive + Last basics.Round // last round to emit output for; inclusive } // DumpString dumps a textual representation of the AutopsyCdvs to the // given io.Writer. -func DumpString(cdvs []AutopsiedCdv, w io.Writer) { +func (a *Autopsy) DumpString(filter AutopsyFilter, w0 io.Writer) (version string) { + w := &switchableWriter{Writer: w0} var playerTracer tracer playerTracer.level = all playerTracer.log = serviceLogger{logging.Base()} + playerTracer.w = w var router rootRouter // TODO this could become inaccurate with orphaned events - for _, aCdv := range cdvs { - fmt.Fprintf(w, "autopsy: metadata: %v >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", aCdv.M) + for cdv := range a.cdvs { + first := true + + for tr := range cdv { + if first { + first = false + fmt.Fprintf(w, "autopsy: metadata: %v >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", tr.m) + version = tr.m.VersionCommitHash + } - for _, tr := range aCdv.T { player := tr.x + + if filter.Enabled { + if player.Round < filter.First || player.Round > filter.Last { + w.Disable() + } else { + w.Enable() + } + } + fmt.Fprintln(w, "autopsy:-") fmt.Fprintln(w, "autopsy:===================================") - DumpPlayerStr(w, player, router, "actual") + dumpPlayerStr(w, player, router, "actual") var p actor = ioLoggedActor{checkedActor{actor: &player, actorContract: playerContract{}}, playerTracer} router.root = p - for i, e := range tr.e { - player, _ = router.submitTop(&playerTracer, player, e) - if i == len(tr.a) { + for pair := range tr.p { + player, _ = router.submitTop(&playerTracer, player, pair.e) + if !pair.aok { break } - fmt.Fprintf(w, "actual: %v\n", tr.a[i]) + fmt.Fprintf(w, "actual: %v\n", pair.a) fmt.Fprintln(w, "autopsy:===================================") - DumpPlayerStr(w, player, router, "predicted") + dumpPlayerStr(w, player, router, "predicted") } } } + return } -// DumpPlayerStr prints useful state of the player, tagging the output with string -func DumpPlayerStr(w io.Writer, p player, r rootRouter, tag string) { +// dumpPlayerStr prints useful state of the player, tagging the output with string. +func dumpPlayerStr(w io.Writer, p player, r rootRouter, tag string) { playerCopy := p playerCopy.Pending = proposalTable{} fmt.Fprintf(w, "autopsy: (%s) player state is %+v (len(player.Pending = %d))\n", tag, playerCopy, len(p.Pending.Pending)) @@ -392,34 +287,197 @@ func DumpPlayerStr(w io.Writer, p player, r rootRouter, tag string) { // DumpMessagePack dumps a msgpack representation of the AutopsiedCdvs to the // given io.Writer. -func DumpMessagePack(cdvs []AutopsiedCdv, w io.WriteCloser) { +func (a *Autopsy) DumpMessagePack(filter AutopsyFilter, w0 io.WriteCloser) (version string) { + w := &switchableWriteCloser{switchableWriter: switchableWriter{Writer: w0}, Closer: w0} var playerTracer tracer playerTracer.log = serviceLogger{logging.Base()} + playerTracer.w = w var router rootRouter // TODO this could become inaccurate with orphaned events - for _, aCdv := range cdvs { + for cdv := range a.cdvs { + first := true + // reset cadaver for every cdv seq (so we don't miss caching player state) c := cadaver{} c.overrideSetup = true c.out = &cadaverHandle{WriteCloser: w} - protocol.EncodeStream(c.out, cadaverMetaEntry) - protocol.EncodeStream(c.out, aCdv.M) + for tr := range cdv { + if first { + first = false + protocol.EncodeStream(c.out, cadaverMetaEntry) + protocol.EncodeStream(c.out, tr.m) + version = tr.m.VersionCommitHash + } - for _, tr := range aCdv.T { player := tr.x var p actor = checkedActor{actor: &player, actorContract: playerContract{}} router.root = p - for i, e := range tr.e { - c.traceInput(player.Round, player.Period, player, tr.e[i]) - if i < len(tr.a) { - c.traceOutput(player.Round, player.Period, player, tr.a[i]) + if filter.Enabled { + if player.Round < filter.First || player.Round > filter.Last { + w.Disable() + } else { + w.Enable() + } + } + + for pair := range tr.p { + c.traceInput(player.Round, player.Period, player, pair.e) + if pair.aok { + c.traceOutput(player.Round, player.Period, player, pair.a) } - player, _ = router.submitTop(&playerTracer, player, e) + player, _ = router.submitTop(&playerTracer, player, pair.e) // TODO can check correspondence here } } protocol.EncodeStream(c.out, cadaverEOSEntry) } + return +} + +type autopsyPair struct { + e event + a []action + aok bool +} + +func (a *Autopsy) extractNextCdv(ch chan<- autopsyTrace) (bounds AutopsyBounds, empty bool, reterr error) { + empty = true + + recording := false + var acc autopsyTrace + + var pch chan autopsyPair + var err error + defer func() { + if recording { + empty = false + close(pch) + close(ch) + } + if err != nil && err.Error() == "EOF" { + reterr = nil + } + }() + + expectAction := false // if false, event is expected; else action + var accp autopsyPair + + for { // terminates automatically on EOF + var t cadaverEntryType + err = protocol.DecodeStream(a, &t) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode cadaverEntryType: %v", err) + return + } + + switch t { + case cadaverEOSEntry: + // if cadaver sequence, terminate. This indicates a crash, or any new cadaver process. + // else, no-op. + if recording { + return + } + case cadaverPlayerEntry: + if recording { + empty = false + close(pch) + } + + pch = make(chan autopsyPair, 0) + acc = autopsyTrace{m: acc.m, p: pch} + err = protocol.DecodeStream(a, &acc.x) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode player: %v", err) + return + } + expectAction = false + + bounds.EndRound = uint64(acc.x.Round) + bounds.EndPeriod = uint64(acc.x.Period) + + if !recording { + // first time + bounds.StartRound = uint64(acc.x.Round) + bounds.StartPeriod = uint64(acc.x.Period) + } + recording = true + + ch <- acc + + case cadaverEventEntry: + var et eventType + err = protocol.DecodeStream(a, &et) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode eventType: %v", err) + return + } + + e := zeroEvent(et) + err = protocol.DecodeStream(a, &e) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode event: %v", err) + return + } + + if recording { + if expectAction { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: expected action but got event") + return + } + accp.e = e + expectAction = !expectAction + } + + case cadaverActionEntry: + var n int + err = protocol.DecodeStream(a, &n) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode number of actions: %v", err) + } + + var as []action + for i := 0; i < n; i++ { + var at actionType + err = protocol.DecodeStream(a, &at) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode actionType: %v", err) + return + } + + zA := zeroAction(at) + err = protocol.DecodeStream(a, &zA) + if err != nil { + fmt.Printf("Action type: %v\n", at.String()) + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode action: %v", err) + return + } + + as = append(as, zA) + } + + if recording { + if !expectAction { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: expected event but got action") + return + } + accp.aok = true + accp.a = as + pch <- accp + + accp = autopsyPair{} + expectAction = !expectAction + } + + case cadaverMetaEntry: + // note that we can read multiple of these for a singe "cadaver seq" during normal operation if a sequence spans multiple + // files (due to fileTargetSize); the latest one gets printed (for now). + err = protocol.DecodeStream(a, &acc.m) + if err != nil { + reterr = fmt.Errorf("Autopsy.ExtractNextCdv: failed to decode meta entry sequence number: %v", err) + return + } + } + } } diff --git a/agreement/trace.go b/agreement/trace.go index f570b9cab8..92a1cd7d51 100644 --- a/agreement/trace.go +++ b/agreement/trace.go @@ -18,6 +18,8 @@ package agreement import ( "fmt" + "io" + "os" "strings" "time" @@ -51,6 +53,8 @@ type tracer struct { log serviceLogger + w io.Writer + // Tracer is now a little stateful (for ad-hoc logging) // parent state machines/routers are responsible for making sure tracer // picks up the right state. Optional. @@ -74,6 +78,7 @@ func makeTracer(log serviceLogger, cadaverFilename string, cadaverSizeTarget uin t.log = log t.verboseReports = verboseReportFlag t.timingReports = timingReportFlag + t.w = os.Stdout fileSizeTarget := int64(cadaverSizeTarget) if fileSizeTarget == 0 { @@ -126,21 +131,21 @@ func (t *tracer) setMetadata(metadata tracerMetadata) { func (t *tracer) ein(src, dest stateMachineTag, e event, r round, p period, s step) { t.seq++ if t.level >= all { - // fmt.Printf("%v %3v %23v -> %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v -> %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v -> %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v -> %23v: %30v\n", t.tag, src, dest, e) } } func (t *tracer) eout(src, dest stateMachineTag, e event, r round, p period, s step) { t.seq++ if t.level >= all { - // fmt.Printf("%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) } else if t.level >= key { switch e.t() { case proposalAccepted, proposalCommittable, softThreshold, certThreshold, nextThreshold: - // fmt.Printf("%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v <- %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v <- %23v: %30v\n", t.tag, src, dest, e) } } } @@ -148,8 +153,8 @@ func (t *tracer) eout(src, dest stateMachineTag, e event, r round, p period, s s func (t *tracer) ainTop(src, dest stateMachineTag, state player, e event, r round, p period, s step) { t.seq++ if t.level >= top { - // fmt.Printf("%v %3v %23v => %23v: %30v\n", t.tag, t.seq, src, dest, e) - fmt.Printf("%v] %23v => %23v: %30v\n", t.tag, src, dest, e) + // fmt.Fprintf(t.w, "%v %3v %23v => %23v: %30v\n", t.tag, t.seq, src, dest, e) + fmt.Fprintf(t.w, "%v] %23v => %23v: %30v\n", t.tag, src, dest, e) } } @@ -165,8 +170,8 @@ func (t *tracer) aoutTop(src, dest stateMachineTag, as []action, r round, p peri t.seq++ if t.level >= top { - // fmt.Printf("%v %3v %23v <= %23v: %.30v\n", t.tag, t.seq, src, dest, as) - fmt.Printf("%v] %23v <= %23v: %.30v\n", t.tag, src, dest, as) + // fmt.Fprintf(t.w, "%v %3v %23v <= %23v: %.30v\n", t.tag, t.seq, src, dest, as) + fmt.Fprintf(t.w, "%v] %23v <= %23v: %.30v\n", t.tag, src, dest, as) } } diff --git a/debug/coroner/main.go b/debug/coroner/main.go index 44b6b6d9c4..91db4f25d8 100644 --- a/debug/coroner/main.go +++ b/debug/coroner/main.go @@ -26,53 +26,45 @@ import ( "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" ) var numRegex = regexp.MustCompile(`^\d+$`) -var posOffRegex = regexp.MustCompile(`^\+\d+$`) -var negOffRegex = regexp.MustCompile(`^\-\d+$`) -var filename = flag.String("file", "", "Name of the input cadaver file") +var filename = flag.String("file", "", "Name of the input cadaver file (otherwise, use stdin)") var versionCheck = flag.Bool("version", false, "Display current coroner build version and exit") - var printmsgpack = flag.Bool("msgpack", false, "If provided, emit msgpack instead of a string") -// note: these also take relative offsets given by "+" or "-" symbols -// e.g., the command -// coroner --skip-head -10 -// will give the last 10 rounds of the coroner. -// If relative is set, the removal is done relative to the minimum round in the -// trace if the given round is nonnegative. Otherwise, the removal is relative -// to the maximum round in the trace. var skipHead = flag.String("skip-head", "", "The first round to trim before") var skipTail = flag.String("skip-tail", "", "The last round to trim after") -func mustParse(data []byte) int64 { - x, err := strconv.ParseInt(string(data), 10, 64) +func mustParse(data []byte) uint64 { + x, err := strconv.ParseUint(string(data), 10, 64) if err != nil { log.Fatalf(`failed to parse round bound in "%s": %s`, string(data), err) } return x } -func parseRoundBound(s string) (bound int64, relative bool) { - data := []byte(s) - signfact := int64(1) - - switch { - case s == "": - case numRegex.Match(data): - bound = mustParse(numRegex.Find(data)) - case negOffRegex.Match(data): - signfact = -1 - fallthrough - case posOffRegex.Match(data): - relative = true - bound = mustParse(numRegex.Find(data[1:])) * signfact - default: - log.Fatalf(`failed to parse round bound in "%s": string does not match regex "^(+|-)\d+$"`, s) +func parseRoundBound(s string) uint64 { + if !numRegex.Match([]byte(s)) { + log.Fatalf(`failed to parse round bound in "%s": string does not match regex "^\d+$"`, s) + } + return mustParse(numRegex.Find([]byte(s))) +} + +func done(n int, err error) { + if n == 0 { + log.Println("coroner: no cadavers autopsied") + } + + if err != nil { + log.Println("coroner: failed to extract full autopsy trace:", err) } - return +} + +func nextBounds(i int, bounds agreement.AutopsyBounds) { + log.Printf("cadaver seq: %d\tstart(r,p): (%d,%d)\tend(r,p): (%d,%d)\n", i, bounds.StartRound, bounds.StartPeriod, bounds.EndRound, bounds.EndPeriod) } func main() { @@ -89,80 +81,34 @@ func main() { if *filename == "" { log.Println("coroner: no filename provided; reading from stdin...") - autopsy, err = agreement.PrepareAutopsyFromInputStream() + autopsy, err = agreement.PrepareAutopsyFromStream(os.Stdin, nextBounds, done) } else { - autopsy, err = agreement.PrepareAutopsy(*filename) + autopsy, err = agreement.PrepareAutopsy(*filename, nextBounds, done) } if err != nil { log.Fatalln("coroner: failed to prepare autopsy:", err) } defer autopsy.Close() - headRound, headRelative := parseRoundBound(*skipHead) - tailRound, tailRelative := parseRoundBound(*skipTail) - - autopsiedCdvs, err := autopsy.ExtractCdvs() - if err != nil { - log.Println("coroner: failed to extract full autopsy trace:", err) - log.Println("coroner: continuing after error...") - } - - if len(autopsiedCdvs) < 1 { - log.Println("coroner: no cadavers autopsied") - return - } - - firstMeta := autopsiedCdvs[0].M - log.Printf("coroner: Cadaver file generated with commit hash:\n%s\n", firstMeta.VersionCommitHash) - if firstMeta.VersionCommitHash != version.GetCommitHash() { - log.Printf("coroner: Cadaver version mismatches coroner version:\n(%s (cadaver) != %s (coroner))\n", firstMeta.VersionCommitHash, version.GetCommitHash()) - } - - cachedStartRound := autopsiedCdvs[0].StartRound + var filter agreement.AutopsyFilter if *skipHead != "" { - numCdvs := len(autopsiedCdvs) - first := headRound - if headRelative { - if headRound >= 0 { - first = cachedStartRound + headRound - } else { - first = autopsiedCdvs[numCdvs-1].EndRound + headRound - } - } - for i := range autopsiedCdvs { - if autopsiedCdvs[i].EndRound < first { - autopsiedCdvs = autopsiedCdvs[i+1:] - break - } - } - autopsiedCdvs[0].T, autopsiedCdvs[0].StartRound = autopsiedCdvs[0].T.FilterBefore(first) + filter.Enabled = true + filter.First = basics.Round(parseRoundBound(*skipHead)) } if *skipTail != "" { - numCdvs := len(autopsiedCdvs) - last := tailRound - if tailRelative { - if tailRound >= 0 { - last = cachedStartRound + tailRound - } else { - last = autopsiedCdvs[numCdvs-1].EndRound + tailRound - } - } - for i := range autopsiedCdvs { - if autopsiedCdvs[i].StartRound > last { - autopsiedCdvs = autopsiedCdvs[:i] - break - } - } - end := len(autopsiedCdvs) - autopsiedCdvs[end].T, autopsiedCdvs[end].EndRound = autopsiedCdvs[end].T.FilterAfter(last) + filter.Enabled = true + filter.Last = basics.Round(parseRoundBound(*skipTail)) } - for i := range autopsiedCdvs { - log.Printf("Cadaver Seq: %d\tstart: %d\tend: %d\n", i, autopsiedCdvs[i].StartRound, autopsiedCdvs[i].EndRound) - } + var commitHash string if *printmsgpack { - agreement.DumpMessagePack(autopsiedCdvs, os.Stdout) + commitHash = autopsy.DumpMessagePack(filter, os.Stdout) } else { - agreement.DumpString(autopsiedCdvs, os.Stdout) + commitHash = autopsy.DumpString(filter, os.Stdout) } + if commitHash != version.GetCommitHash() { + log.Printf("coroner: cadaver version mismatches coroner version:\n(%s (cadaver) != %s (coroner))\n", commitHash, version.GetCommitHash()) + } + + return }