From 4ea7c883f89c64c8bed8a68060397300ad20724a Mon Sep 17 00:00:00 2001 From: Tolik Zinovyev Date: Tue, 30 Nov 2021 16:54:56 -0500 Subject: [PATCH 1/3] Prepare and write txn rows in parallel. --- idb/postgres/internal/writer/util.go | 29 +++++++++ idb/postgres/internal/writer/writer.go | 83 ++++++++++++++++++-------- 2 files changed, 86 insertions(+), 26 deletions(-) create mode 100644 idb/postgres/internal/writer/util.go diff --git a/idb/postgres/internal/writer/util.go b/idb/postgres/internal/writer/util.go new file mode 100644 index 000000000..df606de73 --- /dev/null +++ b/idb/postgres/internal/writer/util.go @@ -0,0 +1,29 @@ +package writer + +import ( + "github.com/jackc/pgx/v4" +) + +type copyFromChannelStruct struct { + ch chan []interface{} + next []interface{} + err error +} + +func (c *copyFromChannelStruct) Next() bool { + var ok bool + c.next, ok = <-c.ch + return ok +} + +func (c *copyFromChannelStruct) Values() ([]interface{}, error) { + return c.next, nil +} + +func (c *copyFromChannelStruct) Err() error { + return nil +} + +func copyFromChannel(ch chan []interface{}) pgx.CopyFromSource { + return ©FromChannelStruct{ch: ch} +} diff --git a/idb/postgres/internal/writer/writer.go b/idb/postgres/internal/writer/writer.go index ab8695edb..93ccb1ac4 100644 --- a/idb/postgres/internal/writer/writer.go +++ b/idb/postgres/internal/writer/writer.go @@ -196,20 +196,20 @@ func transactionAssetID(stxnad *transactions.SignedTxnWithAD, intra uint, block return assetid, nil } -// addInnerTransactions traverses the inner transaction tree and adds them to -// the transaction table. It performs a preorder traversal to correctly compute +// Traverses the inner transaction tree and writes database rows +// to `outCh`. It performs a preorder traversal to correctly compute // the intra round offset, the offset for the next transaction is returned. -func (w *Writer) addInnerTransactions(stxnad *transactions.SignedTxnWithAD, block *bookkeeping.Block, intra, rootIntra uint, rootTxid string, rows [][]interface{}) (uint, [][]interface{}, error) { +func yieldInnerTransactions(ctx context.Context, stxnad *transactions.SignedTxnWithAD, block *bookkeeping.Block, intra, rootIntra uint, rootTxid string, outCh chan []interface{}) (uint, error) { for _, itxn := range stxnad.ApplyData.EvalDelta.InnerTxns { txn := &itxn.Txn typeenum, ok := idb.GetTypeEnum(txn.Type) if !ok { - return 0, nil, fmt.Errorf("addInnerTransactions() get type enum") + return 0, fmt.Errorf("yieldInnerTransactions() get type enum") } // block shouldn't be used for inner transactions. assetid, err := transactionAssetID(&itxn, 0, nil) if err != nil { - return 0, nil, err + return 0, err } extra := idb.TxnExtra{ AssetCloseAmount: itxn.ApplyData.AssetClosingAmount, @@ -221,28 +221,30 @@ func (w *Writer) addInnerTransactions(stxnad *transactions.SignedTxnWithAD, bloc // To reconstruct a full object the root transaction must be fetched. txnNoInner := *stxnad txnNoInner.EvalDelta.InnerTxns = nil - rows = append(rows, []interface{}{ + row := []interface{}{ uint64(block.Round()), intra, int(typeenum), assetid, nil, // inner transactions do not have a txid. encoding.EncodeSignedTxnWithAD(txnNoInner), - encoding.EncodeTxnExtra(&extra)}) + encoding.EncodeTxnExtra(&extra)} + select { + case <-ctx.Done(): + return 0, fmt.Errorf("yieldInnerTransactions() ctx.Err(): %w", ctx.Err()) + case outCh <- row: + } // Recurse at end for preorder traversal - intra, rows, err = - w.addInnerTransactions(&itxn, block, intra+1, rootIntra, rootTxid, rows) + intra, err = + yieldInnerTransactions(ctx, &itxn, block, intra+1, rootIntra, rootTxid, outCh) if err != nil { - return 0, nil, err + return 0, err } } - return intra, rows, nil + return intra, nil } -// Add transactions from `block` to the database. `modifiedTxns` contains enhanced -// apply data generated by evaluator. -func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock) error { - var rows [][]interface{} - +// Writes database rows for transactions (including inner transactions) to `outCh`. +func yieldTransactions(ctx context.Context, block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock, outCh chan []interface{}) error { intra := uint(0) for idx, stib := range block.Payset { var stxnad transactions.SignedTxnWithAD @@ -251,13 +253,13 @@ func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transa // correct transaction hash. stxnad.SignedTxn, stxnad.ApplyData, err = block.BlockHeader.DecodeSignedTxn(stib) if err != nil { - return fmt.Errorf("addTransactions() decode signed txn err: %w", err) + return fmt.Errorf("yieldTransactions() decode signed txn err: %w", err) } txn := &stxnad.Txn typeenum, ok := idb.GetTypeEnum(txn.Type) if !ok { - return fmt.Errorf("addTransactions() get type enum") + return fmt.Errorf("yieldTransactions() get type enum") } assetid, err := transactionAssetID(&stxnad, intra, block) if err != nil { @@ -268,24 +270,53 @@ func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transa extra := idb.TxnExtra{ AssetCloseAmount: modifiedTxns[idx].ApplyData.AssetClosingAmount, } - rows = append(rows, []interface{}{ + row := []interface{}{ uint64(block.Round()), intra, int(typeenum), assetid, id, encoding.EncodeSignedTxnWithAD(stxnad), - encoding.EncodeTxnExtra(&extra)}) + encoding.EncodeTxnExtra(&extra)} + select { + case <-ctx.Done(): + return fmt.Errorf("yieldTransactions() ctx.Err(): %w", ctx.Err()) + case outCh <- row: + } - intra, rows, err = w.addInnerTransactions(&stib.SignedTxnWithAD, block, intra+1, intra, id, rows) + intra, err = yieldInnerTransactions( + ctx, &stib.SignedTxnWithAD, block, intra+1, intra, id, outCh) if err != nil { - return fmt.Errorf("addTransactions() adding inner: %w", err) + return fmt.Errorf("yieldTransactions() adding inner: %w", err) } } - _, err := w.tx.CopyFrom( + return nil +} + +// Add transactions from `block` to the database. `modifiedTxns` contains enhanced +// apply data generated by evaluator. +func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + ch := make(chan []interface{}, 64) + var err0 error + go func() { + err0 = yieldTransactions(ctx, block, modifiedTxns, ch) + close(ch) + }() + + _, err1 := w.tx.CopyFrom( context.Background(), pgx.Identifier{"txn"}, []string{"round", "intra", "typeenum", "asset", "txid", "txn", "extra"}, - pgx.CopyFromRows(rows)) - if err != nil { - return fmt.Errorf("addTransactions() copy from err: %w", err) + copyFromChannel(ch)) + if err1 != nil { + // Exiting here will call `cancelFunc` which will cause the goroutine above to exit. + return fmt.Errorf("addTransactions() copy from err: %w", err1) + } + + // CopyFrom() exited successfully, so `ch` has been closed, so `err0` has been + // written to, and we can read it without worrying about data races. + if err0 != nil { + return fmt.Errorf("addTransactions() err: %w", err0) } return nil From 959335f56ba08fb6f546110141c39e379097b9c3 Mon Sep 17 00:00:00 2001 From: Tolik Zinovyev Date: Tue, 30 Nov 2021 19:15:31 -0500 Subject: [PATCH 2/3] Add a comment. --- idb/postgres/internal/writer/util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/idb/postgres/internal/writer/util.go b/idb/postgres/internal/writer/util.go index df606de73..1372b234a 100644 --- a/idb/postgres/internal/writer/util.go +++ b/idb/postgres/internal/writer/util.go @@ -4,6 +4,7 @@ import ( "github.com/jackc/pgx/v4" ) +// Implements pgx.CopyFromSource. type copyFromChannelStruct struct { ch chan []interface{} next []interface{} From 2486257b338330acee54aaaf9a75b6fd57013f9b Mon Sep 17 00:00:00 2001 From: Tolik Zinovyev Date: Fri, 3 Dec 2021 16:38:54 -0500 Subject: [PATCH 3/3] Change the channel capacity to 1024. --- idb/postgres/internal/writer/writer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/idb/postgres/internal/writer/writer.go b/idb/postgres/internal/writer/writer.go index 93ccb1ac4..352682569 100644 --- a/idb/postgres/internal/writer/writer.go +++ b/idb/postgres/internal/writer/writer.go @@ -296,7 +296,9 @@ func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transa ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() - ch := make(chan []interface{}, 64) + // The capacity of this channel roughly corresponds to the buffer size in the postgres + // driver: 2^16 bytes. + ch := make(chan []interface{}, 1024) var err0 error go func() { err0 = yieldTransactions(ctx, block, modifiedTxns, ch)