diff --git a/cmd/pingpong/runCmd.go b/cmd/pingpong/runCmd.go index ac95b16e07..cf998f4c7e 100644 --- a/cmd/pingpong/runCmd.go +++ b/cmd/pingpong/runCmd.go @@ -90,6 +90,7 @@ var generatedAccountsCount uint64 var generatedAccountsOffset uint64 var generatedAccountSampleMethod string var configPath string +var latencyPath string func init() { rootCmd.AddCommand(runCmd) @@ -111,6 +112,7 @@ func init() { runCmd.Flags().StringVar(&refreshTime, "refresh", "", "Duration of time (seconds) between refilling accounts with money (0 means no refresh)") runCmd.Flags().StringVar(&logicProg, "program", "", "File containing the compiled program to include as a logic sig") runCmd.Flags().StringVar(&configPath, "config", "", "path to read config json from, or json literal") + runCmd.Flags().StringVar(&latencyPath, "latency", "", "path to write txn latency log to (.gz for compressed)") runCmd.Flags().BoolVar(&saveConfig, "save", false, "Save the effective configuration to disk") runCmd.Flags().BoolVar(&useDefault, "reset", false, "Reset to the default configuration (not read from disk)") runCmd.Flags().BoolVar(&quietish, "quiet", false, "quietish stdout logging") @@ -440,6 +442,10 @@ var runCmd = &cobra.Command{ reportErrorf("numAccounts is greater than number of account mnemonics provided") } + if latencyPath != "" { + cfg.TotalLatencyOut = latencyPath + } + cfg.SetDefaultWeights() err = cfg.Check() if err != nil { diff --git a/shared/pingpong/config.go b/shared/pingpong/config.go index 970d5812f6..32f5fab7e9 100644 --- a/shared/pingpong/config.go +++ b/shared/pingpong/config.go @@ -48,6 +48,7 @@ type PpConfig struct { Quiet bool RandomNote bool RandomLease bool + TotalLatencyOut string Program []byte LogicArgs [][]byte diff --git a/shared/pingpong/pingpong.go b/shared/pingpong/pingpong.go index e3574fb5b2..9e6dc27c26 100644 --- a/shared/pingpong/pingpong.go +++ b/shared/pingpong/pingpong.go @@ -17,10 +17,13 @@ package pingpong import ( + "bufio" + "compress/gzip" "context" "encoding/binary" "errors" "fmt" + "io" "math" "math/rand" "os" @@ -34,6 +37,7 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/model" "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/libgoal" @@ -128,6 +132,11 @@ func (ppa *pingPongAccount) String() string { return ow.String() } +type txidSendTime struct { + txid string + when time.Time +} + // WorkerState object holds a running pingpong worker type WorkerState struct { cfg PpConfig @@ -149,6 +158,11 @@ type WorkerState struct { refreshPos int client *libgoal.Client + + // TotalLatencyOut stuff + sentTxid chan txidSendTime + latencyBlocks chan bookkeeping.Block + latencyOuts []io.Writer // latencyOuts is a chain of *os.File, gzip, etc. Write to last element. .Close() last to first. } // returns the number of boxes per app @@ -345,6 +359,25 @@ func (pps *WorkerState) schedule(n int) { //fmt.Printf("schedule now=%s next=%s\n", now, pps.nextSendTime) } +func (pps *WorkerState) recordTxidSent(txid string, err error) { + if err != nil { + return + } + if pps.sentTxid == nil { + return + } + rec := txidSendTime{ + txid: txid, + when: time.Now(), + } + select { + case pps.sentTxid <- rec: + // ok! + default: + // drop, oh well + } +} + func (pps *WorkerState) fundAccounts(client *libgoal.Client) error { var srcFunds, minFund uint64 var err error @@ -545,6 +578,9 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac *libgoal.Client) { // error = fundAccounts() // } + if pps.cfg.TotalLatencyOut != "" { + pps.startTxLatency(ctx, ac) + } pps.nextSendTime = time.Now() ac.SetSuggestedParamsCacheAge(200 * time.Millisecond) pps.client = ac @@ -773,7 +809,9 @@ func (pps *WorkerState) sendFromTo( sentCount++ pps.schedule(1) - _, sendErr = client.BroadcastTransaction(stxn) + var txid string + txid, sendErr = client.BroadcastTransaction(stxn) + pps.recordTxidSent(txid, sendErr) } else { // Generate txn group @@ -844,6 +882,8 @@ func (pps *WorkerState) sendFromTo( sentCount += uint64(len(txGroup)) pps.schedule(len(txGroup)) sendErr = client.BroadcastTransactionGroup(stxGroup) + txid := txGroup[0].ID().String() + pps.recordTxidSent(txid, sendErr) } if sendErr != nil { @@ -1298,3 +1338,152 @@ func signTxn(signer *pingPongAccount, txn transactions.Transaction, cfg PpConfig } return } + +func (pps *WorkerState) startTxLatency(ctx context.Context, ac *libgoal.Client) { + fout, err := os.Create(pps.cfg.TotalLatencyOut) + if err != nil { + fmt.Fprintf(os.Stderr, "%s: %v", pps.cfg.TotalLatencyOut, err) + return + } + pps.latencyOuts = append(pps.latencyOuts, fout) + if strings.HasSuffix(pps.cfg.TotalLatencyOut, ".gz") { + gzout := gzip.NewWriter(fout) + pps.latencyOuts = append(pps.latencyOuts, gzout) + } else { + bw := bufio.NewWriter(fout) + pps.latencyOuts = append(pps.latencyOuts, bw) + } + pps.sentTxid = make(chan txidSendTime, 1000) + pps.latencyBlocks = make(chan bookkeeping.Block, 1) + go pps.txidLatency(ctx) + go pps.txidLatencyBlockWaiter(ctx, ac) +} + +type txidSendTimeIndexed struct { + txidSendTime + index int +} + +const txidLatencySampleSize = 10000 + +// thread which handles measuring total send-to-commit latency +func (pps *WorkerState) txidLatency(ctx context.Context) { + byTxid := make(map[string]txidSendTimeIndexed, txidLatencySampleSize) + txidList := make([]string, 0, txidLatencySampleSize) + out := pps.latencyOuts[len(pps.latencyOuts)-1] + for { + select { + case st := <-pps.sentTxid: + if len(txidList) < txidLatencySampleSize { + index := len(txidList) + txidList = append(txidList, st.txid) + byTxid[st.txid] = txidSendTimeIndexed{ + st, + index, + } + } else { + // random replacement + evict := rand.Intn(len(txidList)) + delete(byTxid, txidList[evict]) + txidList[evict] = st.txid + byTxid[st.txid] = txidSendTimeIndexed{ + st, + evict, + } + } + case bl := <-pps.latencyBlocks: + now := time.Now() + txns, err := bl.DecodePaysetFlat() + if err != nil { + fmt.Fprintf(os.Stderr, "block[%d] payset err %v", bl.Round(), err) + return + } + for _, stxn := range txns { + txid := stxn.ID().String() + st, ok := byTxid[txid] + if ok { + dt := now.Sub(st.when) + fmt.Fprintf(out, "%d\n", dt.Nanoseconds()) + } + } + case <-ctx.Done(): + return + } + } +} + +type flusher interface { + Flush() error +} + +func (pps *WorkerState) txidLatencyDone() { + for i := len(pps.latencyOuts); i >= 0; i-- { + xo := pps.latencyOuts[i] + if fl, ok := xo.(flusher); ok { + err := fl.Flush() + if err != nil { + fmt.Fprintf(os.Stderr, "%s: %v", pps.cfg.TotalLatencyOut, err) + } + } + if cl, ok := xo.(io.Closer); ok { + err := cl.Close() + if err != nil { + fmt.Fprintf(os.Stderr, "%s: %v", pps.cfg.TotalLatencyOut, err) + } + } + } +} + +const errRestartTime = time.Second + +func (pps *WorkerState) txidLatencyBlockWaiter(ctx context.Context, ac *libgoal.Client) { + defer close(pps.latencyBlocks) + done := ctx.Done() + isDone := func(err error) bool { + select { + case <-done: + return true + default: + } + fmt.Fprintf(os.Stderr, "block waiter st : %v", err) + time.Sleep(errRestartTime) + return false + } +restart: + select { + case <-done: + return + default: + } + st, err := ac.Status() + if err != nil { + if isDone(err) { + return + } + goto restart + } + nextRound := st.LastRound + for { + select { + case <-done: + return + default: + } + st, err = ac.WaitForRound(nextRound) + if err != nil { + if isDone(err) { + return + } + goto restart + } + bb, err := ac.BookkeepingBlock(st.LastRound) + if err != nil { + if isDone(err) { + return + } + goto restart + } + pps.latencyBlocks <- bb + nextRound = st.LastRound + } +} diff --git a/test/heapwatch/block_history.py b/test/heapwatch/block_history.py index 002a0cdc09..c7fa3ce6f5 100644 --- a/test/heapwatch/block_history.py +++ b/test/heapwatch/block_history.py @@ -22,6 +22,7 @@ # pip install py-algorand-sdk import argparse +import atexit import base64 import logging import os @@ -231,6 +232,7 @@ def main(): ap.add_argument('-t', '--token', default=None, help='algod API access token') ap.add_argument('--header', dest='headers', nargs='*', help='"Name: value" HTTP header (repeatable)') ap.add_argument('--all', default=False, action='store_true', help='fetch all blocks from 0') + ap.add_argument('--pid') ap.add_argument('--verbose', default=False, action='store_true') ap.add_argument('-o', '--out', default=None, help='file to append json lines to') args = ap.parse_args() @@ -240,6 +242,11 @@ def main(): else: logging.basicConfig(level=logging.INFO) + if args.pid: + with open(args.pid, 'w') as fout: + fout.write('{}'.format(os.getpid())) + atexit.register(os.remove, args.pid) + algorand_data = args.algod or os.getenv('ALGORAND_DATA') if not algorand_data and not ((args.token or args.headers) and args.addr): sys.stderr.write('must specify algod data dir by $ALGORAND_DATA or -d/--algod; OR --a/--addr and -t/--token\n') diff --git a/test/heapwatch/block_history_plot.py b/test/heapwatch/block_history_plot.py index e491170ca8..03f5a45221 100644 --- a/test/heapwatch/block_history_plot.py +++ b/test/heapwatch/block_history_plot.py @@ -25,6 +25,7 @@ import base64 import os import statistics +import sys from algosdk.encoding import msgpack from matplotlib import pyplot as plt @@ -65,7 +66,10 @@ def process(path, args): block = row['block'] rnd = block.get('rnd',0) if (rnd < minrnd) or ((maxrnd is not None) and (rnd > maxrnd)): + sys.stderr.write(f'skip rnd {rnd}\n') continue + if (prevrnd is not None) and (rnd <= prevrnd): + sys.stderr.write(f'wat rnd {rnd}, prevrnd {prevrnd}, line {count}\n') tc = block.get('tc', 0) ts = block.get('ts', 0) # timestamp recorded at algod, 1s resolution int _time = row['_time'] # timestamp recorded at client, 0.000001s resolution float @@ -82,6 +86,8 @@ def process(path, args): tsv.append(_time) if dt > 0.5: dtxn = tc - prevtc + if dtxn < 0: + sys.stderr.write(f'{path}:{count} tc {tc}, prevtc {prevtc}, rnd {rnd}, prevrnd {prevrnd}\n') tps = dtxn / dt mintxn = min(dtxn,mintxn) maxtxn = max(dtxn,maxtxn) @@ -93,7 +99,7 @@ def process(path, args): dtv.append(dt) txnv.append(dtxn) else: - print('b[{}] - b[{}], dt={}'.format(rnd-1,rnd,dt)) + sys.stderr.write('b[{}] - b[{}], dt={}\n'.format(rnd-1,rnd,dt)) else: tsv.append(ts) prevrnd = rnd diff --git a/test/scripts/e2e_tx_latency.py b/test/scripts/e2e_tx_latency.py new file mode 100644 index 0000000000..b181002d90 --- /dev/null +++ b/test/scripts/e2e_tx_latency.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python3 +# +# Measure total percieved tx latency. +# Submit transactions to algod, watch blocks for committed transaction. + +import argparse +import atexit +import base64 +import datetime +import glob +import json +import logging +import os +import queue +import re +import shutil +import statistics +import subprocess +import sys +import tempfile +import time +import threading + +# pip install py-algorand-sdk +import algosdk +from algosdk.encoding import msgpack +import algosdk.v2client +import algosdk.v2client.algod + +logger = logging.getLogger(__name__) + +def openkmd(algodata): + kmdnetpath = sorted(glob.glob(os.path.join(algodata,'kmd-*','kmd.net')))[-1] + kmdnet = open(kmdnetpath, 'rt').read().strip() + kmdtokenpath = sorted(glob.glob(os.path.join(algodata,'kmd-*','kmd.token')))[-1] + kmdtoken = open(kmdtokenpath, 'rt').read().strip() + kmd = algosdk.kmd.KMDClient(kmdtoken, 'http://' + kmdnet) + return kmd + +def openalgod(algodata): + algodnetpath = os.path.join(algodata,'algod.net') + algodnet = open(algodnetpath, 'rt').read().strip() + algodtokenpath = os.path.join(algodata,'algod.token') + algodtoken = open(algodtokenpath, 'rt').read().strip() + algod = algosdk.v2client.algod.AlgodClient(algodtoken, 'http://' + algodnet) + return algod + +def addr_token_from_algod(algorand_data): + with open(os.path.join(algorand_data, 'algod.net')) as fin: + addr = fin.read().strip() + with open(os.path.join(algorand_data, 'algod.token')) as fin: + token = fin.read().strip() + if not addr.startswith('http'): + addr = 'http://' + addr + return addr, token + +def bstr(x): + if isinstance(x, bytes): + try: + return x.decode() + except: + pass + return x + +def obnice(ob): + if isinstance(ob, dict): + return {bstr(k):obnice(v) for k,v in ob.items()} + if isinstance(ob, list): + return [obnice(x) for x in ob] + return ob + +class TxLatencyTest: + def __init__(self, args, algorand_data=None, prev_round=None, out=None): + self.algorand_data = algorand_data + self.token = args.token + self.addr = args.addr + self.headers = header_list_to_dict(args.headers) + self.prev_round = prev_round + self.out = out or sys.stdout + self.lock = threading.Lock() + self.terminated = None + self._kmd = None + self._algod = None + self.privatekey = None + self.pubw = None + self.maxpubaddr = None + self.errors = [] + self.statuses = [] + self.jsonfile = None + self.sentq = queue.Queue() + self.txidq = queue.Queue() + self.period = 1/args.tps + self.sendcount = args.sendcount + self.go = True + self.roundTimes = {} + self.txTimes = [] + return + + def connect(self): + with self.lock: + self._connect() + return self._algod, self._kmd + + def _connect(self): + if self._algod and self._kmd: + return + + # should run from inside self.lock + algodata = self.algorand_data + + logger.debug('pre kmd') + subprocess.run(['goal', 'kmd', 'start', '-t', '3600', '-d', algodata], timeout=5, check=True) + logger.debug('post kmd') + self._kmd = openkmd(algodata) + self._algod = self._algod_connect() #openalgod(algodata) + + def algod(self): + with self.lock: + if self._algod is None: + self._algod = self._algod_connect() + return self._algod + + def _algod_connect(self): + if self.algorand_data: + addr, token = addr_token_from_algod(self.algorand_data) + logger.debug('algod from %r, (%s %s)', self.algorand_data, addr, token) + else: + token = self.token + addr = self.addr + logger.debug('algod from args (%s %s)', self.addr, self.token) + self._algod = algosdk.v2client.algod.AlgodClient(token, addr, headers=self.headers) + return self._algod + + def get_pub_wallet(self): + with self.lock: + self._connect() + if not (self.pubw and self.maxpubaddr): + # find private test node public wallet and its richest account + wallets = self._kmd.list_wallets() + pubwid = None + for xw in wallets: + if xw['name'] == 'unencrypted-default-wallet': + pubwid = xw['id'] + pubw = self._kmd.init_wallet_handle(pubwid, '') + pubaddrs = self._kmd.list_keys(pubw) + maxamount = 0 + maxpubaddr = None + for pa in pubaddrs: + pai = self._algod.account_info(pa) + if pai['amount'] > maxamount: + maxamount = pai['amount'] + maxpubaddr = pai['address'] + self.pubw = pubw + self.maxpubaddr = maxpubaddr + return self.pubw, self.maxpubaddr + + def send_thread(self): + #opriv, opub = algosdk.account.generate_account() + algod = self.algod() + nextsend = time.time() + params = algod.suggested_params() + paramsMtime = nextsend + count = 0 + + while True: + txn = algosdk.transaction.PaymentTxn(self.maxpubaddr, 1000, params.first, params.last, params.gh, self.maxpubaddr, 1, gen=params.gen, flat_fee=True, note='{}_'.format(count).encode() + os.getrandom(8)) + ptxid = txn.get_txid() + if self.privatekey: + stxn = txn.sign(self.privatekey) + else: + stxn = self._kmd.sign_transaction(self.pubw, '', txn) + txid = algod.send_transaction(stxn) + if ptxid != txid: + logger.error('python txid %s, API txid %s', ptxid, txid) + logger.debug('%r', txn.dictify()) + sendt = time.time() + logger.debug('sent %s %f', txid, sendt) + self.sentq.put((txid, sendt)) + + if self.sendcount is not None: + self.sendcount -= 1 + if self.sendcount <= 0: + # signal to consumer end of sending + self.sentq.put((None, None)) + return + + if sendt - paramsMtime > 5: + params = algod.suggested_params() + paramsMtime = sendt + + while nextsend < sendt: + nextsend += self.period + time.sleep(nextsend - sendt) + + def measure_thread(self): + lastround = self.prev_round + algod = self.algod() + while self.go: + b = self.nextblock(lastround) + if b is None: + print("got None nextblock. exiting") + return + b = msgpack.loads(b, strict_map_key=False, raw=True) + b = obnice(b) + nowround = b['block'].get('rnd', 0) + logger.debug('r%d', nowround) + if (lastround is not None) and (nowround != lastround + 1): + logger.info('round jump %d to %d', lastround, nowround) + self._block_handler(b) + lastround = nowround + + def nextblock(self, lastround=None, retries=30): + trycount = 0 + while (trycount < retries) and self.go: + trycount += 1 + try: + return self._nextblock_inner(lastround) + except Exception as e: + if trycount >= retries: + logger.error('too many errors in nextblock retries') + raise + else: + logger.warning('error in nextblock(%r) (retrying): %s', lastround, e) + self._algod = None # retry with a new connection + time.sleep(1.2) + return None + + def _nextblock_inner(self, lastround): + self.block_time = None + algod = self.algod() + if lastround is None: + status = algod.status() + lastround = status['last-round'] + logger.debug('nextblock status last-round %s', lastround) + else: + try: + blk = algod.block_info(lastround + 1, response_format='msgpack') + if blk: + return blk + logger.warning('null block %d, lastround=%r', lastround+1, lastround) + except Exception as e: + pass + #logger.debug('could not get block %d: %s', lastround + 1, e, exc_info=True) + status = algod.status_after_block(lastround) + block_time = time.time() # the block has happened, don't count block data transit time + nbr = status['last-round'] + retries = 30 + while (nbr > lastround + 1) and self.go: + # if more than one block elapsed, we don't have a good time for either block + block_time = None + # try lastround+1 one last time + try: + blk = algod.block_info(lastround + 1, response_format='msgpack') + if blk: + return blk + logger.warning('null block %d, lastround=%r, status.last-round=%d', lastround+1, lastround, nbr) + time.sleep(1.1) + retries -= 1 + if retries <= 0: + raise Exception("too many null block for %d", lastround+1) + except: + break + blk = algod.block_info(nbr, response_format='msgpack') + if blk: + self.block_time = block_time + return blk + raise Exception('got None for blk {}'.format(nbr)) + + def _block_handler(self, b): + block_time = self.block_time or time.time() + nowround = b['block'].get('rnd', 0) + self.roundTimes[nowround] = block_time + # throw away txns, count is kept in round differential ['block']['tc'] + stxibs = b['block'].get('txns', []) + txids = [] + for stxib in stxibs: + txn = stxib['txn'] + #logger.debug('stxib.txn %r', txn) + hgi = stxib.pop('hgi', False) + if hgi: + txn['gh'] = b['block']['gh'] + txn['gen'] = bstr(b['block']['gen']) + txnd = txn + txn = algosdk.transaction.Transaction.undictify(txn) + txid = txn.get_txid() + logger.debug('rx txn %r, txid %s', txnd, txid) + txids.append(txid) + self.txidq.put((txids, block_time)) + + def join_thread(self): + lastSendt = None + sentq = self.sentq + sentByTxid = {} + while self.go: + if sentq is not None: + try: + txid, sendt = self.sentq.get(block=True, timeout=0.2) + if sendt is None: + sentq = None + else: + lastSendt = sendt + sentByTxid[txid] = sendt + except queue.Empty: + pass + elif (lastSendt is None) or (time.time() > (lastSendt + 8)): + self.go = False + return + try: + txids, rxt = self.txidq.get(block=False) + for txid in txids: + sendt = sentByTxid.get(txid) + if sendt is not None: + dt = rxt - sendt + self.txTimes.append(dt) + logger.debug('rx %s %f', txid, dt) + self.out.write('{}\n'.format(dt)) + else: + logger.debug('unk blk txid %r', txid) + except queue.Empty: + pass + +def header_list_to_dict(hlist): + if not hlist: + return None + p = re.compile(r':\s+') + out = {} + for x in hlist: + a, b = p.split(x, 1) + out[a] = b + return out + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument('-d', '--algod', default=None, help='algod data dir') + ap.add_argument('-a', '--addr', default=None, help='algod host:port address') + ap.add_argument('-t', '--token', default=None, help='algod API access token') + ap.add_argument('--header', dest='headers', nargs='*', help='"Name: value" HTTP header (repeatable)') + ap.add_argument('--tps', type=float, default=5, help='TPS to send at') + ap.add_argument('--sendcount', type=int, default=50, help='number of test txns to send at 5 TPS') + ap.add_argument('--verbose', default=False, action='store_true') + args = ap.parse_args() + + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + + algorand_data = args.algod or os.getenv('ALGORAND_DATA') + if not algorand_data and not ((args.token or args.headers) and args.addr): + sys.stderr.write('must specify algod data dir by $ALGORAND_DATA or -d/--algod; OR --a/--addr and -t/--token\n') + sys.exit(1) + + out = sys.stdout + + bot = TxLatencyTest( + args, + algorand_data, + prev_round=None, + out=out, + ) + + pubw, maxpubaddr = bot.get_pub_wallet() + logger.debug('get pub wallet -> %s %s', pubw, maxpubaddr) + + sender = threading.Thread(target=bot.send_thread) + sender.start() + measure = threading.Thread(target=bot.measure_thread) + measure.start() + merge = threading.Thread(target=bot.join_thread) + merge.start() + sender.join() + measure.join() + merge.join() + + txTimes = bot.txTimes + rounds = sorted(bot.roundTimes.keys()) + prevRound = None + prevRoundTime = None + roundDts = [] + for rnd in rounds: + rndTime = bot.roundTimes[rnd] + if (prevRound is not None) and (prevRound + 1 == rnd): + dt = rndTime - prevRoundTime + roundDts.append(dt) + prevRound = rnd + prevRoundTime = rndTime + roundTimeMean = statistics.mean(roundDts) + roundTimeMin = min(roundDts) + roundTimeMax = max(roundDts) + txMean = statistics.mean(txTimes) + tmin = min(txTimes) + tmax = max(txTimes) + out.write('# {} txns measured, {} rounds seen\n'.format(len(txTimes), len(rounds))) + out.write('# rnd (min={}, mean={}, max={})\n'.format(roundTimeMin, roundTimeMean, roundTimeMax)) + out.write('# tx (min={}, mean={}, max={})\n'.format(tmin, txMean, tmax)) + out.write('# tx (min={}, mean={}, max={}) (/(mean rnd))\n'.format(tmin/roundTimeMean, txMean/roundTimeMean, tmax/roundTimeMean)) + return 0 + +if __name__ == '__main__': + main() diff --git a/test/scripts/latency_plot.py b/test/scripts/latency_plot.py new file mode 100644 index 0000000000..aab94d4391 --- /dev/null +++ b/test/scripts/latency_plot.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +# +# process pingpong TotalLatencyOut output file into a graph +# +# requires: +# pip install matplotlib + +import argparse +import gzip +import math +import os +import statistics +import sys +import tarfile + +def mmstdm(data): + dmin = min(data) + dmax = max(data) + dmean = statistics.mean(data) + dstd = statistics.pstdev(data) + return f'[{dmin:.3f}/{dmean:.3f} ({dstd:.3f})/{dmax:.3f}]' + +class LatencyAnalyzer: + def __init__(self): + self.data = [] + def read(self, path): + if path.endswith('.gz'): + with gzip.open(path, 'rt') as fin: + self.rlines(fin) + else: + with open(path, 'rt') as fin: + self.rlines(fin) + def rlines(self, linesource): + for line in linesource: + rec = int(line.strip())/1000000000.0 + self.data.append(rec) + def plot(self, outname): + from matplotlib import pyplot as plt + plt.plot(self.data) + plt.savefig(outname + '.png', format='png') + plt.savefig(outname + '.svg', format='svg') + def report_data(self): + self.data.sort() + some = int(math.log(len(self.data))*4) + lowest = self.data[:some] + highest = self.data[-some:] + return some, lowest, highest + def report(self): + lines = [] + some, lowest, highest = self.report_data() + lines.append(f'{len(self.data)} points: {mmstdm(self.data)}') + lines.append('min {:.3f}s'.format(min(self.data))) + lines.append('max {:.3f}s'.format(max(self.data))) + lines.append(f'lowest-{some}: {mmstdm(lowest)}') + lines.append(f'highest-{some}: {mmstdm(highest)}') + return '\n'.join(lines) + def report_html(self): + lines = [] + some, lowest, highest = self.report_data() + lines.append('

Latency

') + lines.append('
[min/mean (std)/max]
') + lines.append(f'
{len(self.data)} points: {mmstdm(self.data)}
') + lines.append(f'
min {min(self.data):.3f}s
') + lines.append(f'
max {max(self.data):.3f}s
') + lines.append(f'
lowest-{some}: {mmstdm(lowest)}
') + lines.append(f'
highest-{some}: {mmstdm(highest)}
') + return ''.join(lines) + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument('latency_log', nargs='*') + ap.add_argument('-p', '--plot', help='plot base name for .png .svg') + ap.add_argument('-a', '--aout', help='report text output path (append)') + ap.add_argument('-H', '--ahtml', help='report html output path (append)') + ap.add_argument('--tardir') + args = ap.parse_args() + + la = LatencyAnalyzer() + for path in args.latency_log: + la.read(path) + if args.tardir: + for dirpath, dirnames, filenames in os.walk(args.tardir): + for fname in filenames: + if fname.endswith('.tar.bz2'): + tarname = os.path.join(dirpath, fname) + tf = tarfile.open(tarname, 'r:bz2') + for tinfo in tf: + if not tinfo.isfile(): + continue + if 'latency' in tinfo.name: + rawf = tf.extractfile(tinfo) + if tinfo.name.endswith('.gz'): + fin = gzip.open(rawf, 'rt') + else: + fin = rawf + rawf = None + try: + la.rlines(fin) + except Exception as e: + sys.stderr.write(f'{tarname}/{tinfo.name}: {e}\n') + fin.close() + if rawf is not None: + rawf.close() + + if args.plot: + la.plot(args.plot) + if args.ahtml: + with open(args.ahtml, 'at') as fout: + fout.write(la.report_html()) + if args.aout: + with open(args.aout, 'at') as fout: + fout.write(la.report()) + if (not args.aout) and (not args.ahtml): + print(la.report()) + +if __name__ == '__main__': + main()