Skip to content

Commit

Permalink
Drop txnbytes; prepare and write txn rows in parallel.
Browse files Browse the repository at this point in the history
  • Loading branch information
tolikzinovyev committed Nov 18, 2021
1 parent 5af2d40 commit 95bef63
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 71 deletions.
4 changes: 2 additions & 2 deletions cmd/block-generator/runner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ func startIndexer(logfile string, loglevel string, indexerBinary string, algodNe

var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr

if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failure calling Start(): %w", err)
Expand Down
1 change: 0 additions & 1 deletion idb/postgres/internal/schema/setup_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ intra integer NOT NULL,
typeenum smallint NOT NULL,
asset bigint NOT NULL, -- 0=Algos, otherwise AssetIndex
txid bytea, -- base32 of [32]byte hash, or NULL for inner transactions.
txnbytes bytea, -- msgpack encoding of signed txn with apply data, or NULL for inner transactions.
txn jsonb NOT NULL, -- json encoding of signed txn with apply data
extra jsonb NOT NULL,
PRIMARY KEY ( round, intra )
Expand Down
1 change: 0 additions & 1 deletion idb/postgres/internal/schema/setup_postgres_sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 56 additions & 66 deletions idb/postgres/internal/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,96 +181,80 @@ func transactionAssetID(stxnad *transactions.SignedTxnWithAD, intra uint, block
return assetid, nil
}

// addInnerTransactions traverses the inner transaction tree and adds them to
// the transaction table. It performs a preorder traversal to correctly compute
// the intra round offset, the offset for the next transaction is returned.
func (w *Writer) addInnerTransactions(stxnad *transactions.SignedTxnWithAD, block *bookkeeping.Block, intra, rootIntra uint, rootTxid string, rows [][]interface{}) (uint, [][]interface{}, error) {
for _, itxn := range stxnad.ApplyData.EvalDelta.InnerTxns {
txn := &itxn.Txn
typeenum, ok := idb.GetTypeEnum(txn.Type)
if !ok {
return 0, nil, fmt.Errorf("addInnerTransactions() get type enum")
}
// block shouldn't be used for inner transactions.
assetid, err := transactionAssetID(&itxn, 0, nil)
if err != nil {
return 0, nil, err
}
extra := idb.TxnExtra{
AssetCloseAmount: itxn.ApplyData.AssetClosingAmount,
RootIntra: idb.OptionalUint{Present: true, Value: rootIntra},
RootTxid: rootTxid,
}
type copyRow struct {
value []interface{}
err error
}

// When encoding an inner transaction we remove any further nested inner transactions.
// To reconstruct a full object the root transaction must be fetched.
txnNoInner := *stxnad
txnNoInner.EvalDelta.InnerTxns = nil
rows = append(rows, []interface{}{
uint64(block.Round()), intra, int(typeenum), assetid,
nil, // inner transactions do not have a txid.
nil, // txn bytes are only in the parent.
encoding.EncodeSignedTxnWithAD(txnNoInner),
encoding.EncodeTxnExtra(&extra)})

// Recurse at end for preorder traversal
intra, rows, err =
w.addInnerTransactions(&itxn, block, intra+1, rootIntra, rootTxid, rows)
if err != nil {
return 0, nil, err
}
type copyFromChannelStruct struct {
ch chan copyRow
next copyRow
err error
}

func (c *copyFromChannelStruct) Next() bool {
var ok bool
c.next, ok = <-c.ch
return ok
}

func (c *copyFromChannelStruct) Values() ([]interface{}, error) {
if c.next.err != nil {
c.err = c.next.err
return nil, c.err
}
return c.next.value, c.next.err
}

return intra, rows, nil
func (c *copyFromChannelStruct) Err() error {
return c.err
}

// Add transactions from `block` to the database. `modifiedTxns` contains enhanced
// apply data generated by evaluator.
func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock) error {
var rows [][]interface{}
func copyFromChannel(ch chan copyRow) pgx.CopyFromSource {
return &copyFromChannelStruct{ch: ch}
}

func yieldTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock, outch chan copyRow) {
intra := uint(0)
for idx, stib := range block.Payset {
var stxnad transactions.SignedTxnWithAD
var err error
// This function makes sure to set correct genesis information so we can get the
// correct transaction hash.
stxnad.SignedTxn, stxnad.ApplyData, err = block.BlockHeader.DecodeSignedTxn(stib)
if err != nil {
return fmt.Errorf("addTransactions() decode signed txn err: %w", err)
}
stxnad.SignedTxn, stxnad.ApplyData, _ = block.BlockHeader.DecodeSignedTxn(stib)

txn := &stxnad.Txn
typeenum, ok := idb.GetTypeEnum(txn.Type)
if !ok {
return fmt.Errorf("addTransactions() get type enum")
}
assetid, err := transactionAssetID(&stxnad, intra, block)
if err != nil {
return err
}
typeenum, _ := idb.GetTypeEnum(txn.Type)
assetid, _ := transactionAssetID(&stxnad, intra, block)
id := txn.ID().String()

extra := idb.TxnExtra{
AssetCloseAmount: modifiedTxns[idx].ApplyData.AssetClosingAmount,
}
rows = append(rows, []interface{}{
uint64(block.Round()), intra, int(typeenum), assetid, id,
protocol.Encode(&stxnad),
encoding.EncodeSignedTxnWithAD(stxnad),
encoding.EncodeTxnExtra(&extra)})

intra, rows, err = w.addInnerTransactions(&stib.SignedTxnWithAD, block, intra+1, intra, id, rows)
if err != nil {
return fmt.Errorf("addTransactions() adding inner: %w", err)
outch <- copyRow{
value: []interface{}{
uint64(block.Round()), intra, int(typeenum), assetid, id,
encoding.EncodeSignedTxnWithAD(stxnad),
encoding.EncodeTxnExtra(&extra)},
err: nil,
}

intra++
}

close(outch)
}

// Add transactions from `block` to the database. `modifiedTxns` contains enhanced
// apply data generated by evaluator.
func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock) error {
ch := make(chan copyRow, 64)
go yieldTransactions(block, modifiedTxns, ch)

_, err := w.tx.CopyFrom(
context.Background(),
pgx.Identifier{"txn"},
[]string{"round", "intra", "typeenum", "asset", "txid", "txnbytes", "txn", "extra"},
pgx.CopyFromRows(rows))
[]string{"round", "intra", "typeenum", "asset", "txid", "txn", "extra"},
copyFromChannel(ch))
if err != nil {
return fmt.Errorf("addTransactions() copy from err: %w", err)
}
Expand Down Expand Up @@ -515,15 +499,20 @@ func (w *Writer) AddBlock0(block *bookkeeping.Block) error {

// AddBlock writes the block and accounting state deltas to the database.
func (w *Writer) AddBlock(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock, delta ledgercore.StateDelta) error {
start := time.Now()
err := w.addTransactions(block, modifiedTxns)
if err != nil {
return fmt.Errorf("AddBlock() err: %w", err)
}
fmt.Printf("txn: %v\n", time.Since(start))
start = time.Now()
err = w.addTransactionParticipation(block)
if err != nil {
return fmt.Errorf("AddBlock() err: %w", err)
}
fmt.Printf("txn part: %v\n", time.Since(start))

start = time.Now()
var batch pgx.Batch

addBlockHeader(&block.BlockHeader, &batch)
Expand Down Expand Up @@ -557,6 +546,7 @@ func (w *Writer) AddBlock(block *bookkeeping.Block, modifiedTxns []transactions.
if err != nil {
return fmt.Errorf("AddBlock() close results err: %w", err)
}
fmt.Printf("batch: %v\n", time.Since(start))

return nil
}
7 changes: 6 additions & 1 deletion idb/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,23 +239,28 @@ func (db *IndexerDb) AddBlock(block *bookkeeping.Block) error {
}
proto.EnableAssetCloseAmount = true

start := time.Now()
resources, err := prepareEvalResources(&ledgerForEval, block)
if err != nil {
return fmt.Errorf("AddBlock() eval err: %w", err)
}
fmt.Printf("prepare: %v\n", time.Since(start))

start := time.Now()
start = time.Now()
delta, modifiedTxns, err :=
ledger.EvalForIndexer(ledgerForEval, block, proto, resources)
if err != nil {
return fmt.Errorf("AddBlock() eval err: %w", err)
}
fmt.Printf("eval: %v\n", time.Since(start))
metrics.PostgresEvalTimeSeconds.Observe(time.Since(start).Seconds())

start = time.Now()
err = writer.AddBlock(block, modifiedTxns, delta)
if err != nil {
return fmt.Errorf("AddBlock() err: %w", err)
}
fmt.Printf("writer: %v\n", time.Since(start))
}

return nil
Expand Down

0 comments on commit 95bef63

Please sign in to comment.