From 8c4973016a3137a2c8765a60f7b26add60516777 Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Tue, 27 Dec 2022 13:17:53 -0800 Subject: [PATCH 1/5] REST API: Add compression to REST API (#1390) (#1394) --- api/handlers_e2e_test.go | 192 ++++++++++++++++++++++++++++++++------- api/server.go | 8 ++ 2 files changed, 166 insertions(+), 34 deletions(-) diff --git a/api/handlers_e2e_test.go b/api/handlers_e2e_test.go index ad099c5d8..e8d276014 100644 --- a/api/handlers_e2e_test.go +++ b/api/handlers_e2e_test.go @@ -1,6 +1,8 @@ package api import ( + "bytes" + "compress/gzip" "context" "encoding/base64" "fmt" @@ -476,23 +478,7 @@ func TestAccountMaxResultsLimit(t *testing.T) { listenAddr := "localhost:8989" go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts) - // wait at most a few seconds for server to come up - serverUp := false - for maxWait := 3 * time.Second; !serverUp && maxWait > 0; maxWait -= 50 * time.Millisecond { - time.Sleep(50 * time.Millisecond) - resp, err := http.Get("http://" + listenAddr + "/health") - if err != nil { - t.Log("waiting for server:", err) - continue - } - resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Log("waiting for server OK:", resp.StatusCode) - continue - } - serverUp = true // server is up now - } - require.True(t, serverUp, "api.Serve did not start server in time") + waitForServer(t, listenAddr) // make a real HTTP request (to additionally test generated param parsing logic) makeReq := func(t *testing.T, path string, exclude []string, includeDeleted bool, next *string, limit *uint64) (*http.Response, []byte) { @@ -1594,23 +1580,7 @@ func TestGetBlocksTransactionsLimit(t *testing.T) { listenAddr := "localhost:8888" go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts) - // wait at most a few seconds for server to come up - serverUp := false - for maxWait := 3 * time.Second; !serverUp && maxWait > 0; maxWait -= 50 * time.Millisecond { - time.Sleep(50 * time.Millisecond) - resp, err := http.Get("http://" + listenAddr + "/health") - if err != nil { - t.Log("waiting for server:", err) - continue - } - resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Log("waiting for server OK:", resp.StatusCode) - continue - } - serverUp = true // server is up now - } - require.True(t, serverUp, "api.Serve did not start server in time") + waitForServer(t, listenAddr) // make a real HTTP request (to additionally test generated param parsing logic) makeReq := func(t *testing.T, path string, headerOnly bool) (*http.Response, []byte) { @@ -1666,6 +1636,160 @@ func TestGetBlocksTransactionsLimit(t *testing.T) { } } +func TestGetBlockWithCompression(t *testing.T) { + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) + defer shutdownFunc() + defer l.Close() + + /////////// + // Given // A block containing 20 transactions at round 1 + // // + /////////// + + const numbOfTxns = 20 + var txns []transactions.SignedTxnWithAD + for j := 0; j < numbOfTxns; j++ { + txns = append(txns, test.MakePaymentTxn(1, 100, 0, 0, 0, 0, test.AccountA, test.AccountB, basics.Address{}, basics.Address{})) + } + ptxns := make([]*transactions.SignedTxnWithAD, numbOfTxns) + for k := range txns { + ptxns[k] = &txns[k] + } + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, ptxns...) + block.BlockHeader.Round = basics.Round(1) + require.NoError(t, err) + + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) + + ////////// + // When // We look up a block using a ServerImplementation with a compression flag on/off + ////////// + + serverCtx, serverCancel := context.WithCancel(context.Background()) + defer serverCancel() + opts := defaultOpts + listenAddr := "localhost:8889" + go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts) + + waitForServer(t, listenAddr) + + getBlockFunc := func(t *testing.T, headerOnly bool, useCompression bool) *generated.BlockResponse { + path := "/v2/blocks/1" + + client := &http.Client{} + req, err := http.NewRequest("GET", "http://"+listenAddr+path, nil) + require.NoError(t, err) + q := req.URL.Query() + if headerOnly { + q.Add("header-only", "true") + } + if useCompression { + req.Header.Add(echo.HeaderAcceptEncoding, "gzip") + } + req.URL.RawQuery = q.Encode() + t.Log("making HTTP request path", req.URL) + resp, err := client.Do(req) + require.NoError(t, err) + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode, fmt.Sprintf("unexpected return code, body: %s", string(body))) + + var response generated.BlockResponse + if useCompression { + require.Equal(t, resp.Header.Get(echo.HeaderContentEncoding), "gzip") + reader, err := gzip.NewReader(bytes.NewReader(body)) + require.NoError(t, err) + + output, e2 := ioutil.ReadAll(reader) + require.NoError(t, e2) + + body = output + } + err = json.Decode(body, &response) + require.NoError(t, err) + + return &response + } + + ////////// + // Then // Get the same block content compared to uncompress block + ////////// + notCompressedBlock := getBlockFunc(t, false, false) + compressedBlock := getBlockFunc(t, false, true) + require.Equal(t, notCompressedBlock, compressedBlock) + require.Equal(t, len(*notCompressedBlock.Transactions), numbOfTxns) + + // we now make sure that compression flag works with other flags. + notCompressedBlock = getBlockFunc(t, true, false) + compressedBlock = getBlockFunc(t, true, true) + require.Equal(t, len(*notCompressedBlock.Transactions), 0) +} + +func TestNoCompressionSupportForNonBlockAPI(t *testing.T) { + db, shutdownFunc, _, l := setupIdb(t, test.MakeGenesis()) + defer shutdownFunc() + defer l.Close() + + ////////// + // When // we call the health endpoint using compression flag on + ////////// + + serverCtx, serverCancel := context.WithCancel(context.Background()) + defer serverCancel() + opts := defaultOpts + listenAddr := "localhost:8887" + go Serve(serverCtx, listenAddr, db, nil, logrus.New(), opts) + + waitForServer(t, listenAddr) + + path := "/health" + client := &http.Client{} + req, err := http.NewRequest("GET", "http://"+listenAddr+path, nil) + require.NoError(t, err) + req.Header.Add(echo.HeaderAcceptEncoding, "gzip") + + t.Log("making HTTP request path", req.URL) + + resp, err := client.Do(req) + require.NoError(t, err) + + ////////// + // Then // We expect the result not to be compressed. + ////////// + + require.Equal(t, resp.Header.Get(echo.HeaderContentEncoding), "") + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode, fmt.Sprintf("unexpected return code, body: %s", string(body))) + var response generated.HealthCheckResponse + err = json.Decode(body, &response) + require.NoError(t, err) +} + +func waitForServer(t *testing.T, listenAddr string) { + // wait at most a few seconds for server to come up + serverUp := false + for maxWait := 3 * time.Second; !serverUp && maxWait > 0; maxWait -= 50 * time.Millisecond { + time.Sleep(50 * time.Millisecond) + resp, err := http.Get("http://" + listenAddr + "/health") + if err != nil { + t.Log("waiting for server:", err) + continue + } + resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Log("waiting for server OK:", resp.StatusCode) + continue + } + serverUp = true // server is up now + } + require.True(t, serverUp, "api.Serve did not start server in time") +} + // compareAppBoxesAgainstHandler is of type BoxTestComparator func compareAppBoxesAgainstHandler(t *testing.T, db *postgres.IndexerDb, appBoxes map[basics.AppIndex]map[string]string, deletedBoxes map[basics.AppIndex]map[string]bool, verifyTotals bool) { diff --git a/api/server.go b/api/server.go index f5d9e920d..1b4d794a3 100644 --- a/api/server.go +++ b/api/server.go @@ -4,6 +4,7 @@ import ( "context" "net" "net/http" + "strings" "time" echo_contrib "github.com/labstack/echo-contrib/prometheus" @@ -104,6 +105,13 @@ func Serve(ctx context.Context, serveAddr string, db idb.IndexerDb, fetcherError e.Use(middlewares.MakeLogger(log)) e.Use(middleware.CORS()) + e.Use(middleware.GzipWithConfig(middleware.GzipConfig{ + // we currently support compressed result only for GET /v2/blocks/ API + Skipper: func(c echo.Context) bool { + return !strings.Contains(c.Path(), "/v2/blocks/") + }, + Level: -1, + })) middleware := make([]echo.MiddlewareFunc, 0) From 93794356251d4e79be628298192075ab0a809a0c Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Tue, 3 Jan 2023 15:43:07 -0800 Subject: [PATCH 2/5] API: Hotfix block endpoint (#1397) --- api/handlers.go | 5 ----- idb/idb.go | 4 ++++ idb/postgres/postgres.go | 9 ++++++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/api/handlers.go b/api/handlers.go index e7dd659f4..b0d7f08a0 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -1359,11 +1359,6 @@ func (si *ServerImplementation) fetchBlock(ctx context.Context, round uint64, op results := make([]generated.Transaction, 0) for _, txrow := range transactions { - // Do not include inner transactions. - if txrow.RootTxn != nil { - continue - } - tx, err := txnRowToTransaction(txrow) if err != nil { return err diff --git a/idb/idb.go b/idb/idb.go index 018c6c445..b7dd34430 100644 --- a/idb/idb.go +++ b/idb/idb.go @@ -240,6 +240,10 @@ type TransactionFilter struct { // instead of the root txn. ReturnInnerTxnOnly bool + // If this flag is set to true, then the query returns only root txns + // and no inner txns + ReturnRootTxnsOnly bool + // If this flag is set to true, then the query returns the block excluding // the transactions HeaderOnly bool diff --git a/idb/postgres/postgres.go b/idb/postgres/postgres.go index 686584b19..b9ae15af5 100644 --- a/idb/postgres/postgres.go +++ b/idb/postgres/postgres.go @@ -457,7 +457,7 @@ func (db *IndexerDb) GetBlock(ctx context.Context, round uint64, options idb.Get if options.Transactions { out := make(chan idb.TxnRow, 1) - query, whereArgs, err := buildTransactionQuery(idb.TransactionFilter{Round: &round, Limit: options.MaxTransactionsLimit + 1}) + query, whereArgs, err := buildTransactionQuery(idb.TransactionFilter{Round: &round, Limit: options.MaxTransactionsLimit + 1, ReturnRootTxnsOnly: true}) if err != nil { err = fmt.Errorf("txn query err %v", err) out <- idb.TxnRow{Error: err} @@ -658,9 +658,12 @@ func buildTransactionQuery(tf idb.TransactionFilter) (query string, whereArgs [] if tf.RekeyTo != nil && (*tf.RekeyTo) { whereParts = append(whereParts, "(t.txn -> 'txn' -> 'rekey') IS NOT NULL") } + if tf.ReturnRootTxnsOnly { + whereParts = append(whereParts, "t.txid IS NOT NULL") + } // If returnInnerTxnOnly flag is false, then return the root transaction - if !tf.ReturnInnerTxnOnly { + if !(tf.ReturnInnerTxnOnly || tf.ReturnRootTxnsOnly) { query = "SELECT t.round, t.intra, t.txn, root.txn, t.extra, t.asset, h.realtime FROM txn t JOIN block_header h ON t.round = h.round" } else { query = "SELECT t.round, t.intra, t.txn, NULL, t.extra, t.asset, h.realtime FROM txn t JOIN block_header h ON t.round = h.round" @@ -671,7 +674,7 @@ func buildTransactionQuery(tf idb.TransactionFilter) (query string, whereArgs [] } // join in the root transaction if the returnInnerTxnOnly flag is false - if !tf.ReturnInnerTxnOnly { + if !(tf.ReturnInnerTxnOnly || tf.ReturnRootTxnsOnly) { query += " LEFT OUTER JOIN txn root ON t.round = root.round AND (t.extra->>'root-intra')::int = root.intra" } From b02b5ff2afe2b074fd6fe9b3a4014661a3047be7 Mon Sep 17 00:00:00 2001 From: DevOps Service Date: Tue, 3 Jan 2023 23:48:56 +0000 Subject: [PATCH 3/5] Bump version to 2.15.1-rc1 --- .version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.version b/.version index 68e69e405..c486e45d1 100644 --- a/.version +++ b/.version @@ -1 +1 @@ -2.15.0 +2.15.1-rc1 From c7b3b78c2ab6ff8a0a0d0216b35a90e2b1bbdd20 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Wed, 4 Jan 2023 18:13:52 -0500 Subject: [PATCH 4/5] API: Skip inner transaction fetching when possible. (#1402) * Skip inner transaction fetching when possible. * Add a unit test. * Add an option to allow skipping the optimization. --- idb/idb.go | 3 ++ idb/postgres/postgres.go | 25 ++++++++++ idb/postgres/postgres_integration_test.go | 12 +++-- idb/postgres/postgres_test.go | 56 +++++++++++++++++++++++ 4 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 idb/postgres/postgres_test.go diff --git a/idb/idb.go b/idb/idb.go index b7dd34430..413a66a16 100644 --- a/idb/idb.go +++ b/idb/idb.go @@ -198,6 +198,9 @@ type GetBlockOptions struct { // TransactionFilter is a parameter object with all the transaction filter options. type TransactionFilter struct { + // SkipOptimization is used for testing to ensure the parameters are not modified. + SkipOptimization bool + // Address filtering transactions for one Address will // return transactions newest-first proceding into the // past. Paging through such results can be achieved by diff --git a/idb/postgres/postgres.go b/idb/postgres/postgres.go index b9ae15af5..cfc78395e 100644 --- a/idb/postgres/postgres.go +++ b/idb/postgres/postgres.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "os" + "reflect" "strings" "sync" "time" @@ -719,9 +720,33 @@ func (db *IndexerDb) yieldTxns(ctx context.Context, tx pgx.Tx, tf idb.Transactio db.yieldTxnsThreadSimple(rows, out, nil, nil) } +// txnFilterOptimization checks that there are no parameters set which would +// cause non-contiguous transaction results. As long as all transactions in a +// range are returned, we are guaranteed to fetch the root transactions, and +// therefore do not need to fetch inner transactions. +func txnFilterOptimization(tf idb.TransactionFilter) idb.TransactionFilter { + defaults := idb.TransactionFilter{ + Round: tf.Round, + MinRound: tf.MinRound, + MaxRound: tf.MaxRound, + BeforeTime: tf.BeforeTime, + AfterTime: tf.AfterTime, + Limit: tf.Limit, + NextToken: tf.NextToken, + Offset: tf.Offset, + OffsetLT: tf.OffsetLT, + OffsetGT: tf.OffsetGT, + } + if reflect.DeepEqual(tf, defaults) { + tf.ReturnRootTxnsOnly = true + } + return tf +} + // Transactions is part of idb.IndexerDB func (db *IndexerDb) Transactions(ctx context.Context, tf idb.TransactionFilter) (<-chan idb.TxnRow, uint64) { out := make(chan idb.TxnRow, 1) + tf = txnFilterOptimization(tf) tx, err := db.db.BeginTx(ctx, readonlyRepeatableRead) if err != nil { diff --git a/idb/postgres/postgres_integration_test.go b/idb/postgres/postgres_integration_test.go index 7ce7e1c66..aba7791da 100644 --- a/idb/postgres/postgres_integration_test.go +++ b/idb/postgres/postgres_integration_test.go @@ -1151,7 +1151,7 @@ func TestNonDisplayableUTF8(t *testing.T) { // Test 3: transaction results properly serialized // Transaction results also return the inner txn acfg - txnRows, _ := db.Transactions(context.Background(), idb.TransactionFilter{}) + txnRows, _ := db.Transactions(context.Background(), idb.TransactionFilter{SkipOptimization: true}) num = 0 for row := range txnRows { require.NoError(t, row.Error) @@ -1948,7 +1948,8 @@ func TestBadTxnJsonEncoding(t *testing.T) { { offset := uint64(rootIntra) tf := idb.TransactionFilter{ - Offset: &offset, + SkipOptimization: true, + Offset: &offset, } rowsCh, _ := db.Transactions(context.Background(), tf) @@ -1962,7 +1963,8 @@ func TestBadTxnJsonEncoding(t *testing.T) { { offset := uint64(rootIntra) + 1 tf := idb.TransactionFilter{ - Offset: &offset, + SkipOptimization: true, + Offset: &offset, } rowsCh, _ := db.Transactions(context.Background(), tf) @@ -2078,7 +2080,7 @@ func TestTransactionsTxnAhead(t *testing.T) { require.NoError(t, err) } { - rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{}) + rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{SkipOptimization: true}) _, ok := <-rowsCh assert.False(t, ok) } @@ -2092,7 +2094,7 @@ func TestTransactionsTxnAhead(t *testing.T) { require.NoError(t, err) } { - rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{}) + rowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{SkipOptimization: true}) row, ok := <-rowsCh require.True(t, ok) require.NoError(t, row.Error) diff --git a/idb/postgres/postgres_test.go b/idb/postgres/postgres_test.go new file mode 100644 index 000000000..cd2d6bdd1 --- /dev/null +++ b/idb/postgres/postgres_test.go @@ -0,0 +1,56 @@ +package postgres + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/algorand/indexer/idb" +) + +func Test_txnFilterOptimization(t *testing.T) { + tests := []struct { + name string + arg idb.TransactionFilter + rootOnly bool + }{ + { + name: "basic", + arg: idb.TransactionFilter{}, + rootOnly: true, + }, + { + name: "rounds", + arg: idb.TransactionFilter{MinRound: 100, MaxRound: 101, Limit: 100}, + rootOnly: true, + }, + { + name: "date", + arg: idb.TransactionFilter{AfterTime: time.Unix(100000, 100), Limit: 100}, + rootOnly: true, + }, + { + name: "token", + arg: idb.TransactionFilter{NextToken: "test", Limit: 100}, + rootOnly: true, + }, + { + name: "address", + arg: idb.TransactionFilter{Address: []byte{0x10, 0x11, 0x12}, Limit: 100}, + rootOnly: false, + }, + { + name: "type", + arg: idb.TransactionFilter{TypeEnum: idb.TypeEnumPay, Limit: 100}, + rootOnly: false, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%s(%t)", tt.name, tt.rootOnly), func(t *testing.T) { + optimized := txnFilterOptimization(tt.arg) + assert.Equal(t, tt.rootOnly, optimized.ReturnRootTxnsOnly) + }) + } +} From e1004f622578ce6f9f3a3530ce57903e10dd699b Mon Sep 17 00:00:00 2001 From: DevOps Service Date: Fri, 6 Jan 2023 15:43:36 +0000 Subject: [PATCH 5/5] Bump version to 2.15.1 --- .version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.version b/.version index c486e45d1..3b1fc7950 100644 --- a/.version +++ b/.version @@ -1 +1 @@ -2.15.1-rc1 +2.15.1