Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare and write txn rows in parallel #802

Merged
merged 3 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions idb/postgres/internal/writer/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package writer

import (
"github.com/jackc/pgx/v4"
)

// Implements pgx.CopyFromSource.
type copyFromChannelStruct struct {
ch chan []interface{}
next []interface{}
err error
}

func (c *copyFromChannelStruct) Next() bool {
var ok bool
c.next, ok = <-c.ch
return ok
}

func (c *copyFromChannelStruct) Values() ([]interface{}, error) {
return c.next, nil
}

func (c *copyFromChannelStruct) Err() error {
return nil
}

func copyFromChannel(ch chan []interface{}) pgx.CopyFromSource {
return &copyFromChannelStruct{ch: ch}
}
83 changes: 57 additions & 26 deletions idb/postgres/internal/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,20 @@ func transactionAssetID(stxnad *transactions.SignedTxnWithAD, intra uint, block
return assetid, nil
}

// addInnerTransactions traverses the inner transaction tree and adds them to
// the transaction table. It performs a preorder traversal to correctly compute
// Traverses the inner transaction tree and writes database rows
// to `outCh`. It performs a preorder traversal to correctly compute
// the intra round offset, the offset for the next transaction is returned.
func (w *Writer) addInnerTransactions(stxnad *transactions.SignedTxnWithAD, block *bookkeeping.Block, intra, rootIntra uint, rootTxid string, rows [][]interface{}) (uint, [][]interface{}, error) {
func yieldInnerTransactions(ctx context.Context, stxnad *transactions.SignedTxnWithAD, block *bookkeeping.Block, intra, rootIntra uint, rootTxid string, outCh chan []interface{}) (uint, error) {
for _, itxn := range stxnad.ApplyData.EvalDelta.InnerTxns {
txn := &itxn.Txn
typeenum, ok := idb.GetTypeEnum(txn.Type)
if !ok {
return 0, nil, fmt.Errorf("addInnerTransactions() get type enum")
return 0, fmt.Errorf("yieldInnerTransactions() get type enum")
}
// block shouldn't be used for inner transactions.
assetid, err := transactionAssetID(&itxn, 0, nil)
if err != nil {
return 0, nil, err
return 0, err
}
extra := idb.TxnExtra{
AssetCloseAmount: itxn.ApplyData.AssetClosingAmount,
Expand All @@ -221,28 +221,30 @@ func (w *Writer) addInnerTransactions(stxnad *transactions.SignedTxnWithAD, bloc
// To reconstruct a full object the root transaction must be fetched.
txnNoInner := *stxnad
txnNoInner.EvalDelta.InnerTxns = nil
rows = append(rows, []interface{}{
row := []interface{}{
uint64(block.Round()), intra, int(typeenum), assetid,
nil, // inner transactions do not have a txid.
encoding.EncodeSignedTxnWithAD(txnNoInner),
encoding.EncodeTxnExtra(&extra)})
encoding.EncodeTxnExtra(&extra)}
select {
case <-ctx.Done():
return 0, fmt.Errorf("yieldInnerTransactions() ctx.Err(): %w", ctx.Err())
case outCh <- row:
}

// Recurse at end for preorder traversal
intra, rows, err =
w.addInnerTransactions(&itxn, block, intra+1, rootIntra, rootTxid, rows)
intra, err =
yieldInnerTransactions(ctx, &itxn, block, intra+1, rootIntra, rootTxid, outCh)
if err != nil {
return 0, nil, err
return 0, err
}
}

return intra, rows, nil
return intra, nil
}

// Add transactions from `block` to the database. `modifiedTxns` contains enhanced
// apply data generated by evaluator.
func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock) error {
var rows [][]interface{}

// Writes database rows for transactions (including inner transactions) to `outCh`.
func yieldTransactions(ctx context.Context, block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock, outCh chan []interface{}) error {
intra := uint(0)
for idx, stib := range block.Payset {
var stxnad transactions.SignedTxnWithAD
Expand All @@ -251,13 +253,13 @@ func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transa
// correct transaction hash.
stxnad.SignedTxn, stxnad.ApplyData, err = block.BlockHeader.DecodeSignedTxn(stib)
if err != nil {
return fmt.Errorf("addTransactions() decode signed txn err: %w", err)
return fmt.Errorf("yieldTransactions() decode signed txn err: %w", err)
}

txn := &stxnad.Txn
typeenum, ok := idb.GetTypeEnum(txn.Type)
if !ok {
return fmt.Errorf("addTransactions() get type enum")
return fmt.Errorf("yieldTransactions() get type enum")
}
assetid, err := transactionAssetID(&stxnad, intra, block)
if err != nil {
Expand All @@ -268,24 +270,53 @@ func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transa
extra := idb.TxnExtra{
AssetCloseAmount: modifiedTxns[idx].ApplyData.AssetClosingAmount,
}
rows = append(rows, []interface{}{
row := []interface{}{
uint64(block.Round()), intra, int(typeenum), assetid, id,
encoding.EncodeSignedTxnWithAD(stxnad),
encoding.EncodeTxnExtra(&extra)})
encoding.EncodeTxnExtra(&extra)}
select {
case <-ctx.Done():
return fmt.Errorf("yieldTransactions() ctx.Err(): %w", ctx.Err())
case outCh <- row:
}

intra, rows, err = w.addInnerTransactions(&stib.SignedTxnWithAD, block, intra+1, intra, id, rows)
intra, err = yieldInnerTransactions(
ctx, &stib.SignedTxnWithAD, block, intra+1, intra, id, outCh)
if err != nil {
return fmt.Errorf("addTransactions() adding inner: %w", err)
return fmt.Errorf("yieldTransactions() adding inner: %w", err)
}
}

_, err := w.tx.CopyFrom(
return nil
}

// Add transactions from `block` to the database. `modifiedTxns` contains enhanced
// apply data generated by evaluator.
func (w *Writer) addTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

ch := make(chan []interface{}, 64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By definition, we're doing this because serialization is slower than the network. Therefore we shouldn't need a buffered channel.

If we do keep the buffered channel, you'll need to make sure we remove any unread items from the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, serialization is faster than postgres.

Unread items in case of an error? The channel will be garbage collected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since serialization is faster, doesn't that mean a buffer is not required? As long as we're able to have 1 pending object to load there would never be any downtime.

Re: garbage collection. I didn't realize the unread items would be garbage collected, but it makes sense that this would be the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there is a more authoritative source for this, but here is a reference to something I've heard before:
https://github.com/uber-go/guide/blob/master/style.md#channel-size-is-one-or-none

Copy link
Contributor Author

@tolikzinovyev tolikzinovyev Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one is much faster than the other, this PR wouldn't improve anything. However, here postgres and serialization are somewhat comparable in speed.

Our postgres driver creates buffers of 65KB and writes them to the network. Depending on whether the "network" has a large enough buffer, this call may block and we would waste CPU time that we could use for serialization. So actually, I'm thinking we should increase the buffer to 1024 to be safe. Would do you think?

I've seen this uber style guide. I think they mainly worry about blocking that can occur when writing to a channel, but when the buffer size is large, blocking only rarely occurs in production and not in testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the 65KB number you mentioned, but it doesn't seem to be working the way you described. It doesn't accumulate 65KB of data, it reads up to 65KB of data then sends it immediately. Since it's TCP it needs to be received correctly (the wasted CPU time you mentioned), but there doesn't seem to be a way to speed that up, so I don't think we need a large buffer of transactions.

https://github.com/jackc/pgconn/blob/master/pgconn.go#L1199

	go func() {
		buf := make([]byte, 0, 65536)
		buf = append(buf, 'd')
		sp := len(buf)

		for {

// here, r is a named pipe. When you call CopyTo data is written into the pipe.
// 65KB isn't a target, it's a temporary buffer. There is no guarantee that it should be filled.
// To be more precise, from the  documentation:
//    Reads and Writes on the pipe are matched one to one except when multiple Reads are
//    needed to consume a single Write. That is, each Write to the PipeWriter blocks until it 
//    has satisfied one or more Reads from the PipeReader that fully consume the written data.

			n, readErr := r.Read(buf[5:cap(buf)])
			if n > 0 {
				buf = buf[0 : n+5]
				pgio.SetInt32(buf[sp:], int32(n+4))

// If any data was read... 1 byte, all 65KB bytes, it's written to the network.
// This means that the writer is blocked here regardless of how large the buffered
// channel is. After the read, our code will queue the next row in the pipe, and
// queue the one after in the channel.

				_, writeErr := pgConn.conn.Write(buf)
				if writeErr != nil {
					// Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine.
					pgConn.conn.Close()

					copyErrChan <- writeErr
					return
				}
			}
			if readErr != nil {
				copyErrChan <- readErr
				return
			}

			select {
			case <-abortCopyChan:
				return
			default:
			}
		}
	}()

It seems like having a large buffered channel wouldn't help anything. Did you see that performance was affected in your testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree that we would waste cpu time with an unbuffered channel, then as soon as w.Write() returns, where w is the write side of the pipe, we will need to spend time serializing transactions until we can call w.Write() again. I tried tracing down the code, and I think it boils down to how fast the write syscall returns. That seems to be platform depended, so I would rather introduce a big enough buffer in our code. In any case, it's small enough not to worry about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Waste" is the wrong term since the goal isn't to use as much CPU as possible. From what I see we need one pending round to continuously load the DB. Thanks to the pipe we already have 2 queued up with an unbuffered channel.

How did you get the 8% metric? Could you try with an unbuffered channel to see if it has the same performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with that pipe is that the writer writes slightly more than 2^16 bytes and will unblock only when all of it is read. However, the reader reads slightly less than 2^16 bytes and writes it to the network before reading the other couple of bytes.

I'm running tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chan size | TPS
0 | 6798.43
64 | 6891.80
1024 | 6946.75

var err0 error
go func() {
err0 = yieldTransactions(ctx, block, modifiedTxns, ch)
close(ch)
}()

_, err1 := w.tx.CopyFrom(
context.Background(),
pgx.Identifier{"txn"},
[]string{"round", "intra", "typeenum", "asset", "txid", "txn", "extra"},
pgx.CopyFromRows(rows))
if err != nil {
return fmt.Errorf("addTransactions() copy from err: %w", err)
copyFromChannel(ch))
if err1 != nil {
winder marked this conversation as resolved.
Show resolved Hide resolved
// Exiting here will call `cancelFunc` which will cause the goroutine above to exit.
return fmt.Errorf("addTransactions() copy from err: %w", err1)
}

// CopyFrom() exited successfully, so `ch` has been closed, so `err0` has been
// written to, and we can read it without worrying about data races.
if err0 != nil {
return fmt.Errorf("addTransactions() err: %w", err0)
}

return nil
Expand Down