Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FOR REVIEW ONLY: indexer 2.15.1 into master #1400

Merged
merged 5 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.15.0
2.15.1
5 changes: 0 additions & 5 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,11 +1359,6 @@ func (si *ServerImplementation) fetchBlock(ctx context.Context, round uint64, op

results := make([]generated.Transaction, 0)
for _, txrow := range transactions {
// Do not include inner transactions.
if txrow.RootTxn != nil {
continue
}

tx, err := txnRowToTransaction(txrow)
if err != nil {
return err
Expand Down
192 changes: 158 additions & 34 deletions api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"fmt"
Expand Down Expand Up @@ -476,23 +478,7 @@ func TestAccountMaxResultsLimit(t *testing.T) {
listenAddr := "localhost:8989"
go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts)

// wait at most a few seconds for server to come up
serverUp := false
for maxWait := 3 * time.Second; !serverUp && maxWait > 0; maxWait -= 50 * time.Millisecond {
time.Sleep(50 * time.Millisecond)
resp, err := http.Get("http://" + listenAddr + "/health")
if err != nil {
t.Log("waiting for server:", err)
continue
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Log("waiting for server OK:", resp.StatusCode)
continue
}
serverUp = true // server is up now
}
require.True(t, serverUp, "api.Serve did not start server in time")
waitForServer(t, listenAddr)

// make a real HTTP request (to additionally test generated param parsing logic)
makeReq := func(t *testing.T, path string, exclude []string, includeDeleted bool, next *string, limit *uint64) (*http.Response, []byte) {
Expand Down Expand Up @@ -1594,23 +1580,7 @@ func TestGetBlocksTransactionsLimit(t *testing.T) {
listenAddr := "localhost:8888"
go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts)

// wait at most a few seconds for server to come up
serverUp := false
for maxWait := 3 * time.Second; !serverUp && maxWait > 0; maxWait -= 50 * time.Millisecond {
time.Sleep(50 * time.Millisecond)
resp, err := http.Get("http://" + listenAddr + "/health")
if err != nil {
t.Log("waiting for server:", err)
continue
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Log("waiting for server OK:", resp.StatusCode)
continue
}
serverUp = true // server is up now
}
require.True(t, serverUp, "api.Serve did not start server in time")
waitForServer(t, listenAddr)

// make a real HTTP request (to additionally test generated param parsing logic)
makeReq := func(t *testing.T, path string, headerOnly bool) (*http.Response, []byte) {
Expand Down Expand Up @@ -1666,6 +1636,160 @@ func TestGetBlocksTransactionsLimit(t *testing.T) {
}
}

func TestGetBlockWithCompression(t *testing.T) {
db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis())
defer shutdownFunc()
defer l.Close()

///////////
// Given // A block containing 20 transactions at round 1
// //
///////////

const numbOfTxns = 20
var txns []transactions.SignedTxnWithAD
for j := 0; j < numbOfTxns; j++ {
txns = append(txns, test.MakePaymentTxn(1, 100, 0, 0, 0, 0, test.AccountA, test.AccountB, basics.Address{}, basics.Address{}))
}
ptxns := make([]*transactions.SignedTxnWithAD, numbOfTxns)
for k := range txns {
ptxns[k] = &txns[k]
}
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, ptxns...)
block.BlockHeader.Round = basics.Round(1)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

//////////
// When // We look up a block using a ServerImplementation with a compression flag on/off
//////////

serverCtx, serverCancel := context.WithCancel(context.Background())
defer serverCancel()
opts := defaultOpts
listenAddr := "localhost:8889"
go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts)

waitForServer(t, listenAddr)

getBlockFunc := func(t *testing.T, headerOnly bool, useCompression bool) *generated.BlockResponse {
path := "/v2/blocks/1"

client := &http.Client{}
req, err := http.NewRequest("GET", "http://"+listenAddr+path, nil)
require.NoError(t, err)
q := req.URL.Query()
if headerOnly {
q.Add("header-only", "true")
}
if useCompression {
req.Header.Add(echo.HeaderAcceptEncoding, "gzip")
}
req.URL.RawQuery = q.Encode()
t.Log("making HTTP request path", req.URL)
resp, err := client.Do(req)
require.NoError(t, err)

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode, fmt.Sprintf("unexpected return code, body: %s", string(body)))

var response generated.BlockResponse
if useCompression {
require.Equal(t, resp.Header.Get(echo.HeaderContentEncoding), "gzip")
reader, err := gzip.NewReader(bytes.NewReader(body))
require.NoError(t, err)

output, e2 := ioutil.ReadAll(reader)
require.NoError(t, e2)

body = output
}
err = json.Decode(body, &response)
require.NoError(t, err)

return &response
}

//////////
// Then // Get the same block content compared to uncompress block
//////////
notCompressedBlock := getBlockFunc(t, false, false)
compressedBlock := getBlockFunc(t, false, true)
require.Equal(t, notCompressedBlock, compressedBlock)
require.Equal(t, len(*notCompressedBlock.Transactions), numbOfTxns)

// we now make sure that compression flag works with other flags.
notCompressedBlock = getBlockFunc(t, true, false)
compressedBlock = getBlockFunc(t, true, true)
require.Equal(t, len(*notCompressedBlock.Transactions), 0)
}

func TestNoCompressionSupportForNonBlockAPI(t *testing.T) {
db, shutdownFunc, _, l := setupIdb(t, test.MakeGenesis())
defer shutdownFunc()
defer l.Close()

//////////
// When // we call the health endpoint using compression flag on
//////////

serverCtx, serverCancel := context.WithCancel(context.Background())
defer serverCancel()
opts := defaultOpts
listenAddr := "localhost:8887"
go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts)

waitForServer(t, listenAddr)

path := "/health"
client := &http.Client{}
req, err := http.NewRequest("GET", "http://"+listenAddr+path, nil)
require.NoError(t, err)
req.Header.Add(echo.HeaderAcceptEncoding, "gzip")

t.Log("making HTTP request path", req.URL)

resp, err := client.Do(req)
require.NoError(t, err)

//////////
// Then // We expect the result not to be compressed.
//////////

require.Equal(t, resp.Header.Get(echo.HeaderContentEncoding), "")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode, fmt.Sprintf("unexpected return code, body: %s", string(body)))
var response generated.HealthCheckResponse
err = json.Decode(body, &response)
require.NoError(t, err)
}

func waitForServer(t *testing.T, listenAddr string) {
// wait at most a few seconds for server to come up
serverUp := false
for maxWait := 3 * time.Second; !serverUp && maxWait > 0; maxWait -= 50 * time.Millisecond {
time.Sleep(50 * time.Millisecond)
resp, err := http.Get("http://" + listenAddr + "/health")
if err != nil {
t.Log("waiting for server:", err)
continue
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Log("waiting for server OK:", resp.StatusCode)
continue
}
serverUp = true // server is up now
}
require.True(t, serverUp, "api.Serve did not start server in time")
}

// compareAppBoxesAgainstHandler is of type BoxTestComparator
func compareAppBoxesAgainstHandler(t *testing.T, db *postgres.IndexerDb,
appBoxes map[basics.AppIndex]map[string]string, deletedBoxes map[basics.AppIndex]map[string]bool, verifyTotals bool) {
Expand Down
8 changes: 8 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"net/http"
"strings"
"time"

echo_contrib "github.com/labstack/echo-contrib/prometheus"
Expand Down Expand Up @@ -104,6 +105,13 @@ func Serve(ctx context.Context, serveAddr string, db idb.IndexerDb, fetcherError

e.Use(middlewares.MakeLogger(log))
e.Use(middleware.CORS())
e.Use(middleware.GzipWithConfig(middleware.GzipConfig{
// we currently support compressed result only for GET /v2/blocks/ API
Skipper: func(c echo.Context) bool {
return !strings.Contains(c.Path(), "/v2/blocks/")
},
Level: -1,
}))

middleware := make([]echo.MiddlewareFunc, 0)

Expand Down
7 changes: 7 additions & 0 deletions idb/idb.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ type GetBlockOptions struct {

// TransactionFilter is a parameter object with all the transaction filter options.
type TransactionFilter struct {
// SkipOptimization is used for testing to ensure the parameters are not modified.
SkipOptimization bool

// Address filtering transactions for one Address will
// return transactions newest-first proceding into the
// past. Paging through such results can be achieved by
Expand Down Expand Up @@ -240,6 +243,10 @@ type TransactionFilter struct {
// instead of the root txn.
ReturnInnerTxnOnly bool

// If this flag is set to true, then the query returns only root txns
// and no inner txns
ReturnRootTxnsOnly bool

// If this flag is set to true, then the query returns the block excluding
// the transactions
HeaderOnly bool
Expand Down
34 changes: 31 additions & 3 deletions idb/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -457,7 +458,7 @@ func (db *IndexerDb) GetBlock(ctx context.Context, round uint64, options idb.Get

if options.Transactions {
out := make(chan idb.TxnRow, 1)
query, whereArgs, err := buildTransactionQuery(idb.TransactionFilter{Round: &round, Limit: options.MaxTransactionsLimit + 1})
query, whereArgs, err := buildTransactionQuery(idb.TransactionFilter{Round: &round, Limit: options.MaxTransactionsLimit + 1, ReturnRootTxnsOnly: true})
if err != nil {
err = fmt.Errorf("txn query err %v", err)
out <- idb.TxnRow{Error: err}
Expand Down Expand Up @@ -658,9 +659,12 @@ func buildTransactionQuery(tf idb.TransactionFilter) (query string, whereArgs []
if tf.RekeyTo != nil && (*tf.RekeyTo) {
whereParts = append(whereParts, "(t.txn -> 'txn' -> 'rekey') IS NOT NULL")
}
if tf.ReturnRootTxnsOnly {
whereParts = append(whereParts, "t.txid IS NOT NULL")
}

// If returnInnerTxnOnly flag is false, then return the root transaction
if !tf.ReturnInnerTxnOnly {
if !(tf.ReturnInnerTxnOnly || tf.ReturnRootTxnsOnly) {
query = "SELECT t.round, t.intra, t.txn, root.txn, t.extra, t.asset, h.realtime FROM txn t JOIN block_header h ON t.round = h.round"
} else {
query = "SELECT t.round, t.intra, t.txn, NULL, t.extra, t.asset, h.realtime FROM txn t JOIN block_header h ON t.round = h.round"
Expand All @@ -671,7 +675,7 @@ func buildTransactionQuery(tf idb.TransactionFilter) (query string, whereArgs []
}

// join in the root transaction if the returnInnerTxnOnly flag is false
if !tf.ReturnInnerTxnOnly {
if !(tf.ReturnInnerTxnOnly || tf.ReturnRootTxnsOnly) {
query += " LEFT OUTER JOIN txn root ON t.round = root.round AND (t.extra->>'root-intra')::int = root.intra"
}

Expand Down Expand Up @@ -716,9 +720,33 @@ func (db *IndexerDb) yieldTxns(ctx context.Context, tx pgx.Tx, tf idb.Transactio
db.yieldTxnsThreadSimple(rows, out, nil, nil)
}

// txnFilterOptimization checks that there are no parameters set which would
// cause non-contiguous transaction results. As long as all transactions in a
// range are returned, we are guaranteed to fetch the root transactions, and
// therefore do not need to fetch inner transactions.
func txnFilterOptimization(tf idb.TransactionFilter) idb.TransactionFilter {
defaults := idb.TransactionFilter{
Round: tf.Round,
MinRound: tf.MinRound,
MaxRound: tf.MaxRound,
BeforeTime: tf.BeforeTime,
AfterTime: tf.AfterTime,
Limit: tf.Limit,
NextToken: tf.NextToken,
Offset: tf.Offset,
OffsetLT: tf.OffsetLT,
OffsetGT: tf.OffsetGT,
}
if reflect.DeepEqual(tf, defaults) {
tf.ReturnRootTxnsOnly = true
}
return tf
}

// Transactions is part of idb.IndexerDB
func (db *IndexerDb) Transactions(ctx context.Context, tf idb.TransactionFilter) (<-chan idb.TxnRow, uint64) {
out := make(chan idb.TxnRow, 1)
tf = txnFilterOptimization(tf)

tx, err := db.db.BeginTx(ctx, readonlyRepeatableRead)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions idb/postgres/postgres_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ func TestNonDisplayableUTF8(t *testing.T) {

// Test 3: transaction results properly serialized
// Transaction results also return the inner txn acfg
txnRows, _ := db.Transactions(context.Background(), idb.TransactionFilter{})
txnRows, _ := db.Transactions(context.Background(), idb.TransactionFilter{SkipOptimization: true})
num = 0
for row := range txnRows {
require.NoError(t, row.Error)
Expand Down Expand Up @@ -1948,7 +1948,8 @@ func TestBadTxnJsonEncoding(t *testing.T) {
{
offset := uint64(rootIntra)
tf := idb.TransactionFilter{
Offset: &offset,
SkipOptimization: true,
Offset: &offset,
}
rowsCh, _ := db.Transactions(context.Background(), tf)

Expand All @@ -1962,7 +1963,8 @@ func TestBadTxnJsonEncoding(t *testing.T) {
{
offset := uint64(rootIntra) + 1
tf := idb.TransactionFilter{
Offset: &offset,
SkipOptimization: true,
Offset: &offset,
}
rowsCh, _ := db.Transactions(context.Background(), tf)

Expand Down Expand Up @@ -2078,7 +2080,7 @@ func TestTransactionsTxnAhead(t *testing.T) {
require.NoError(t, err)
}
{
rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{})
rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{SkipOptimization: true})
_, ok := <-rowsCh
assert.False(t, ok)
}
Expand All @@ -2092,7 +2094,7 @@ func TestTransactionsTxnAhead(t *testing.T) {
require.NoError(t, err)
}
{
rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{})
rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{SkipOptimization: true})
row, ok := <-rowsCh
require.True(t, ok)
require.NoError(t, row.Error)
Expand Down
Loading