Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f80e4e2
wip init
brianolson Oct 25, 2022
86645fc
Merge remote-tracking branch 'origin/master' into pp-total-latency
brianolson Nov 3, 2022
ed4801f
working tx latency measurement
brianolson Nov 4, 2022
1fb74bb
Merge remote-tracking branch 'origin/master' into pp-total-latency
brianolson Nov 7, 2022
5138c89
wip
brianolson Nov 8, 2022
430befc
txid trace
brianolson Nov 8, 2022
14f8cc2
standalone tx latency tester
brianolson Nov 9, 2022
c1c01fd
Merge remote-tracking branch 'origin/master' into pp-total-latency
brianolson Nov 18, 2022
a569089
wip debug
brianolson Nov 18, 2022
99ec33c
error messages
brianolson Nov 18, 2022
88a6d45
fix round fetching
brianolson Nov 18, 2022
4f3223e
fix
brianolson Nov 18, 2022
b00d7fc
plot output of pingpong TotalLatencyOut
brianolson Nov 18, 2022
cf3c437
Merge remote-tracking branch 'origin/master' into pp-total-latency
brianolson Dec 8, 2022
e08a71d
cleanup. comment
brianolson Dec 8, 2022
9f8a93b
smaller err restart time so we less likely to oversleep a block
brianolson Dec 8, 2022
9224c5b
Merge remote-tracking branch 'origin/master' into pp-total-latency
brianolson Jan 10, 2023
1c3197b
pingpong run --latency latencylog.gz
brianolson Jan 10, 2023
2a44310
gzip in, statistics out
brianolson Jan 10, 2023
f8c87a4
accumulate data over walking a dir of tars of logs
brianolson Jan 11, 2023
04f778a
latency plot reporting tweaks
brianolson Jan 12, 2023
226f3a6
latency report html mode
brianolson Jan 12, 2023
315c271
code review cleanup
brianolson Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/pingpong/runCmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var generatedAccountsCount uint64
var generatedAccountsOffset uint64
var generatedAccountSampleMethod string
var configPath string
var latencyPath string

func init() {
rootCmd.AddCommand(runCmd)
Expand All @@ -111,6 +112,7 @@ func init() {
runCmd.Flags().StringVar(&refreshTime, "refresh", "", "Duration of time (seconds) between refilling accounts with money (0 means no refresh)")
runCmd.Flags().StringVar(&logicProg, "program", "", "File containing the compiled program to include as a logic sig")
runCmd.Flags().StringVar(&configPath, "config", "", "path to read config json from, or json literal")
runCmd.Flags().StringVar(&latencyPath, "latency", "", "path to write txn latency log to (.gz for compressed)")
runCmd.Flags().BoolVar(&saveConfig, "save", false, "Save the effective configuration to disk")
runCmd.Flags().BoolVar(&useDefault, "reset", false, "Reset to the default configuration (not read from disk)")
runCmd.Flags().BoolVar(&quietish, "quiet", false, "quietish stdout logging")
Expand Down Expand Up @@ -440,6 +442,10 @@ var runCmd = &cobra.Command{
reportErrorf("numAccounts is greater than number of account mnemonics provided")
}

if latencyPath != "" {
cfg.TotalLatencyOut = latencyPath
}

cfg.SetDefaultWeights()
err = cfg.Check()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions shared/pingpong/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type PpConfig struct {
Quiet bool
RandomNote bool
RandomLease bool
TotalLatencyOut string

Program []byte
LogicArgs [][]byte
Expand Down
191 changes: 190 additions & 1 deletion shared/pingpong/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package pingpong

import (
"bufio"
"compress/gzip"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"math/rand"
"os"
Expand All @@ -34,6 +37,7 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/model"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/libgoal"
Expand Down Expand Up @@ -128,6 +132,11 @@ func (ppa *pingPongAccount) String() string {
return ow.String()
}

type txidSendTime struct {
txid string
when time.Time
}

// WorkerState object holds a running pingpong worker
type WorkerState struct {
cfg PpConfig
Expand All @@ -149,6 +158,11 @@ type WorkerState struct {
refreshPos int

client *libgoal.Client

// TotalLatencyOut stuff
sentTxid chan txidSendTime
latencyBlocks chan bookkeeping.Block
latencyOuts []io.Writer // latencyOuts is a chain of *os.File, gzip, etc. Write to last element. .Close() last to first.
}

// returns the number of boxes per app
Expand Down Expand Up @@ -345,6 +359,25 @@ func (pps *WorkerState) schedule(n int) {
//fmt.Printf("schedule now=%s next=%s\n", now, pps.nextSendTime)
}

func (pps *WorkerState) recordTxidSent(txid string, err error) {
if err != nil {
return
}
if pps.sentTxid == nil {
return
}
rec := txidSendTime{
txid: txid,
when: time.Now(),
}
select {
case pps.sentTxid <- rec:
// ok!
default:
// drop, oh well
}
}

func (pps *WorkerState) fundAccounts(client *libgoal.Client) error {
var srcFunds, minFund uint64
var err error
Expand Down Expand Up @@ -545,6 +578,9 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac *libgoal.Client) {
// error = fundAccounts()
// }

if pps.cfg.TotalLatencyOut != "" {
pps.startTxLatency(ctx, ac)
}
pps.nextSendTime = time.Now()
ac.SetSuggestedParamsCacheAge(200 * time.Millisecond)
pps.client = ac
Expand Down Expand Up @@ -773,7 +809,9 @@ func (pps *WorkerState) sendFromTo(

sentCount++
pps.schedule(1)
_, sendErr = client.BroadcastTransaction(stxn)
var txid string
txid, sendErr = client.BroadcastTransaction(stxn)
pps.recordTxidSent(txid, sendErr)
} else {
// Generate txn group

Expand Down Expand Up @@ -844,6 +882,8 @@ func (pps *WorkerState) sendFromTo(
sentCount += uint64(len(txGroup))
pps.schedule(len(txGroup))
sendErr = client.BroadcastTransactionGroup(stxGroup)
txid := txGroup[0].ID().String()
pps.recordTxidSent(txid, sendErr)
}

if sendErr != nil {
Expand Down Expand Up @@ -1298,3 +1338,152 @@ func signTxn(signer *pingPongAccount, txn transactions.Transaction, cfg PpConfig
}
return
}

func (pps *WorkerState) startTxLatency(ctx context.Context, ac *libgoal.Client) {
fout, err := os.Create(pps.cfg.TotalLatencyOut)
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v", pps.cfg.TotalLatencyOut, err)
return
}
pps.latencyOuts = append(pps.latencyOuts, fout)
if strings.HasSuffix(pps.cfg.TotalLatencyOut, ".gz") {
gzout := gzip.NewWriter(fout)
pps.latencyOuts = append(pps.latencyOuts, gzout)
} else {
bw := bufio.NewWriter(fout)
pps.latencyOuts = append(pps.latencyOuts, bw)
}
pps.sentTxid = make(chan txidSendTime, 1000)
pps.latencyBlocks = make(chan bookkeeping.Block, 1)
go pps.txidLatency(ctx)
go pps.txidLatencyBlockWaiter(ctx, ac)
}

type txidSendTimeIndexed struct {
txidSendTime
index int
}

const txidLatencySampleSize = 10000

// thread which handles measuring total send-to-commit latency
func (pps *WorkerState) txidLatency(ctx context.Context) {
byTxid := make(map[string]txidSendTimeIndexed, txidLatencySampleSize)
txidList := make([]string, 0, txidLatencySampleSize)
out := pps.latencyOuts[len(pps.latencyOuts)-1]
for {
select {
case st := <-pps.sentTxid:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true sampling should be done on the pps.sentTxid writer side. Otherwise 10k samples will be fully overwritten in few rounds under full TPS.

if len(txidList) < txidLatencySampleSize {
index := len(txidList)
txidList = append(txidList, st.txid)
byTxid[st.txid] = txidSendTimeIndexed{
st,
index,
}
} else {
// random replacement
evict := rand.Intn(len(txidList))
delete(byTxid, txidList[evict])
txidList[evict] = st.txid
byTxid[st.txid] = txidSendTimeIndexed{
st,
evict,
}
}
Comment on lines +1376 to +1393
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this random replacement scheme, just thinking out loud -- if your sample size is smaller than the number of data points, why not just do a circular buffer? Advantages I see would be that the datapoints would be still well-ordered and you would not be missing any data for the range of time the sample was collected. The way it works now makes it so that the most recent datapoints are most-likely to be included, and the least recent datapoints are least-likely to be included, which would also be the case with a circular buffer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the rate is larger than the buffer then a circular buffer could lose almost all the data. With a buffer of 10_000 but 26_000 transactions in a block it would only know about the most recent transactions and only measure their latency. Better to measure over a longer duration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry why not make it 26000 then?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old habit from working in RAM-scare environments. And to make up some more justification: maybe I don't even want to log all of the txns, but just a sample, because we also don't need to process a full 6000 TPS of this data.

case bl := <-pps.latencyBlocks:
now := time.Now()
txns, err := bl.DecodePaysetFlat()
if err != nil {
fmt.Fprintf(os.Stderr, "block[%d] payset err %v", bl.Round(), err)
return
}
for _, stxn := range txns {
txid := stxn.ID().String()
st, ok := byTxid[txid]
if ok {
dt := now.Sub(st.when)
fmt.Fprintf(out, "%d\n", dt.Nanoseconds())
}
}
case <-ctx.Done():
return
}
}
}

type flusher interface {
Flush() error
}

func (pps *WorkerState) txidLatencyDone() {
for i := len(pps.latencyOuts); i >= 0; i-- {
xo := pps.latencyOuts[i]
if fl, ok := xo.(flusher); ok {
err := fl.Flush()
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v", pps.cfg.TotalLatencyOut, err)
}
}
if cl, ok := xo.(io.Closer); ok {
err := cl.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v", pps.cfg.TotalLatencyOut, err)
}
}
}
}

const errRestartTime = time.Second

func (pps *WorkerState) txidLatencyBlockWaiter(ctx context.Context, ac *libgoal.Client) {
defer close(pps.latencyBlocks)
done := ctx.Done()
isDone := func(err error) bool {
select {
case <-done:
return true
default:
}
fmt.Fprintf(os.Stderr, "block waiter st : %v", err)
time.Sleep(errRestartTime)
return false
}
restart:
select {
case <-done:
return
default:
}
st, err := ac.Status()
if err != nil {
if isDone(err) {
return
}
goto restart
}
nextRound := st.LastRound
for {
select {
case <-done:
return
default:
}
st, err = ac.WaitForRound(nextRound)
if err != nil {
if isDone(err) {
return
}
goto restart
}
bb, err := ac.BookkeepingBlock(st.LastRound)
if err != nil {
if isDone(err) {
return
}
goto restart
}
pps.latencyBlocks <- bb
nextRound = st.LastRound
}
}
7 changes: 7 additions & 0 deletions test/heapwatch/block_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# pip install py-algorand-sdk

import argparse
import atexit
import base64
import logging
import os
Expand Down Expand Up @@ -231,6 +232,7 @@ def main():
ap.add_argument('-t', '--token', default=None, help='algod API access token')
ap.add_argument('--header', dest='headers', nargs='*', help='"Name: value" HTTP header (repeatable)')
ap.add_argument('--all', default=False, action='store_true', help='fetch all blocks from 0')
ap.add_argument('--pid')
ap.add_argument('--verbose', default=False, action='store_true')
ap.add_argument('-o', '--out', default=None, help='file to append json lines to')
args = ap.parse_args()
Expand All @@ -240,6 +242,11 @@ def main():
else:
logging.basicConfig(level=logging.INFO)

if args.pid:
with open(args.pid, 'w') as fout:
fout.write('{}'.format(os.getpid()))
atexit.register(os.remove, args.pid)

algorand_data = args.algod or os.getenv('ALGORAND_DATA')
if not algorand_data and not ((args.token or args.headers) and args.addr):
sys.stderr.write('must specify algod data dir by $ALGORAND_DATA or -d/--algod; OR --a/--addr and -t/--token\n')
Expand Down
8 changes: 7 additions & 1 deletion test/heapwatch/block_history_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import base64
import os
import statistics
import sys

from algosdk.encoding import msgpack
from matplotlib import pyplot as plt
Expand Down Expand Up @@ -65,7 +66,10 @@ def process(path, args):
block = row['block']
rnd = block.get('rnd',0)
if (rnd < minrnd) or ((maxrnd is not None) and (rnd > maxrnd)):
sys.stderr.write(f'skip rnd {rnd}\n')
continue
if (prevrnd is not None) and (rnd <= prevrnd):
sys.stderr.write(f'wat rnd {rnd}, prevrnd {prevrnd}, line {count}\n')
tc = block.get('tc', 0)
ts = block.get('ts', 0) # timestamp recorded at algod, 1s resolution int
_time = row['_time'] # timestamp recorded at client, 0.000001s resolution float
Expand All @@ -82,6 +86,8 @@ def process(path, args):
tsv.append(_time)
if dt > 0.5:
dtxn = tc - prevtc
if dtxn < 0:
sys.stderr.write(f'{path}:{count} tc {tc}, prevtc {prevtc}, rnd {rnd}, prevrnd {prevrnd}\n')
tps = dtxn / dt
mintxn = min(dtxn,mintxn)
maxtxn = max(dtxn,maxtxn)
Expand All @@ -93,7 +99,7 @@ def process(path, args):
dtv.append(dt)
txnv.append(dtxn)
else:
print('b[{}] - b[{}], dt={}'.format(rnd-1,rnd,dt))
sys.stderr.write('b[{}] - b[{}], dt={}\n'.format(rnd-1,rnd,dt))
else:
tsv.append(ts)
prevrnd = rnd
Expand Down
Loading