Skip to content

Commit

Permalink
add queryTimeout to ChainDB
Browse files Browse the repository at this point in the history
This wires most read-only queries with timeout and cancel contexts.

The new pgtimeout/T option is added to set the psql timeout with any
string accepted by time.ParseDuration.
The default is 1 hr, which is essentially unlimited in normal operation.
Time timeout is passed to ChainDB via dcrpg.DBInfo.QueryTimeout.

Add string dbtypes.PGCancelError set to the expected postgres error
message when a statement is canceled at user request.
Add timeoutError which provides a string "TIMEOUT of PostgreSQL
query after %d seconds" where %d is the set timeout in ChainDB.
Add IsTimeoutErr and IsTimeout to dbtypes.
Add replaceCancelError which replaces an error that matches the
PGCancelError with the friendly error from timeoutError.
Use replaceCancelError to filter the output of several Retrieve*
function calls.
Create dbtypes.CtxDeadlineExceeded = context.DeadlineExceeded.Error().
In dbtypes.IsTimeout, detect either CtxDeadlineExceeded or TimeoutPrefix.
Update replaceCancelError to recognize CtxDeadlineExceeded as well as
PGCancelError, replacing them both with (*ChainDB).timeoutError.
NOTE: As a general rule, where the context.WithTimeout context is
created, the error returned from the retrieval function should be filtered
with replaceCancelError.

Create (*explorerUI).timeoutErrorPage(error) bool to show an appropriate
status page if the error is a DB timeout, and return whether it was a
timeout error. Use timeoutErrorPage after each call to a function in
the explorerSource interface from a http.HandlerFunc.

Handle timeouts in APIs too.  Rename api/DataSourceAux.GetTxHistoryData
to TxHistoryData. (*ChainDB).AddressBalance now returns an error.

dev balance update error in sync is now not a sync error:
Keep going with sync if the project fund balance query times out.
This is in line with how errors from GetPgChartsData are treated
(reported but ignored).

Use context.Background for update/set functions that should not timeout.

This also includes a dcrpg bug fix in VinsForTx where input scripts were
incorrect.  This affects transactions loaded for side chain blocks. The
pkScripts were from the wrong vouts rows previously. This adds a new
query joining vins and vouts, to get the correct vouts data for the given
vins row ID. Add PkScriptByVinID and PkScriptByVoutID for clarity, calling
PkScriptByVinID from VinsForTx.

Rename explorer statuses and add timeout status:
The status type is renamed from statusType to expStatus.
The status strings of type expStatus are renamed with the prefix
ExpStatus so they are loosely grouped.
Add ExpStatusDBTimeout, which returns a 503 http status code.
Modify AddressPage to check for sql.ErrNoRows and timeout errors,
handling these appropriately. sql.ErrNoRows is not an error as this
means there are no confirmed transactions. However, a timeout
should send the user to the status page with the appropriate message.
Any other non-nil error sends the user to the status page with a generic
error message.


Other changes:
Rename (*ChainDB).RetrieveAddressSpentUnspent to AddressSpentUnspent
for consistency, and to not conflict with the function in queries.go.
Create HeightHashDB, which combines HeightDB and HashDB, and use
it instead of directly calling RetrieveBestBlockHeight.  This makes the
timeout context creation a bit more elegant and less repetitive.
Create PkScriptByID, which wraps RetrievePkScriptByID from queries.go
with ChainDB's query timeout context.
Remove the unused RetrieveAddressRecvCount function and the query
string SelectAddressRecvCount.
Document many query functions.
Rename (*ChainDB).GetBlockSummaryTimeRange to BlockSummaryTimeRange.
Rename GetTicketPoolBlockMaturity to TicketPoolBlockMaturity.
Rename GetTicketPoolByDateAndInterval to TicketPoolByDateAndInterval.
Rename GetTicketsPriceByHeight to TicketsPriceByHeight, and update
explorerDataSource interface.
Add (*ChainDB).BlockChainDbID wrapping RetrieveBlockChainDbID.
Add (*ChainDB).TicketsByPrice wrapping retrieveTicketByPrice with a
timeout context.
Add (*ChainDB).TicketsByInputCount wrapping retrieveTicketsGroupedByType
with a timeout context.
Add CoinSupplyChartsData wrapping retrieveCoinSupply with timeout.
Add DbTxByHash wrapping RetrieveDbTxByHash.
Add FundingOutpointIndxByVinID wrapping RetrieveFundingOutpointIndxByVinID.
Improve function names by removing get, retrieve, etc.
Rename RetrieveTicketIDByHash to RetrieveTicketIDByHashNoCancel
to indicate that the function should never be modified to accept a
cancellable context.
Create BlockFlagsNoCancel, a variation on BlockFlags.
Create BlockChainDbIDNoCancel, a variation on BlockChainDbID.
Use these two new functiOns in UpdateLastBlock.
Add an error return to several insight-related queries.
Lots of insight clean-up.
Contextify GetAddressMetrics and retrieveAddressTxsCount.
Add (*ChainDB).NumAddressIntervals, wrapping retrieveAddressTxsCount
and TimeBasedGroupingToInterval.
Add dbtypes.TimeIntervals, a slice of the day/week/month/year intervals.
Do not display sync tx processing rate when none processed
Rename (*ChainDB).GetAddressMetrics to AddressMetrics.
  • Loading branch information
chappjc authored Nov 16, 2018
1 parent e766982 commit d834d32
Show file tree
Hide file tree
Showing 21 changed files with 1,221 additions and 596 deletions.
53 changes: 45 additions & 8 deletions api/apiroutes.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type DataSourceAux interface {
txnType dbtypes.AddrTxnType) (*apitypes.Address, error)
AddressTotals(address string) (*apitypes.AddressTotals, error)
VotesInBlock(hash string) (int16, error)
GetTxHistoryData(address string, addrChart dbtypes.HistoryChart,
TxHistoryData(address string, addrChart dbtypes.HistoryChart,
chartGroupings dbtypes.TimeBasedGrouping) (*dbtypes.ChartsData, error)
TicketPoolVisualization(interval dbtypes.TimeBasedGrouping) (
[]*dbtypes.PoolTicketsData, *dbtypes.PoolTicketsData, uint64, error)
Expand All @@ -118,8 +118,8 @@ func NewContext(client *rpcclient.Client, params *chaincfg.Params, dataSource Da
conns, _ := client.GetConnectionCount()
nodeHeight, _ := client.GetBlockCount()

// explorerDataSource is an interface that could have a value of pointer
// type, and if either is nil this means lite mode.
// auxDataSource is an interface that could have a value of pointer type,
// and if either is nil this means lite mode.
liteMode := auxDataSource == nil || reflect.ValueOf(auxDataSource).IsNil()

return &appContext{
Expand Down Expand Up @@ -428,6 +428,9 @@ func (c *appContext) setOutputSpends(txid string, vouts []apitypes.Vout) error {
// For each output of this transaction, look up any spending transactions,
// and the index of the spending transaction input.
spendHashes, spendVinInds, voutInds, err := c.AuxDataSource.SpendingTransactions(txid)
if dbtypes.IsTimeoutErr(err) {
return fmt.Errorf("SpendingTransactions: %v", err)
}
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("unable to get spending transaction info for outputs of %s", txid)
}
Expand Down Expand Up @@ -835,9 +838,14 @@ func (c *appContext) getTicketPoolByDate(w http.ResponseWriter, r *http.Request)
// needed by this request.
interval := dbtypes.TimeGroupingFromStr(tp)
barCharts, _, height, err := c.AuxDataSource.TicketPoolVisualization(interval)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("TicketPoolVisualization: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
apiLog.Errorf("Unable to get ticket pool by date: %v", err)
http.Error(w, http.StatusText(422), 422)
http.Error(w, http.StatusText(http.StatusUnprocessableEntity), http.StatusUnprocessableEntity)
return
}

Expand Down Expand Up @@ -887,6 +895,11 @@ func (c *appContext) blockSubsidies(w http.ResponseWriter, r *http.Request) {
if hash != "" {
var err error
numVotes, err = c.AuxDataSource.VotesInBlock(hash)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("VotesInBlock: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
http.NotFound(w, r)
return
Expand Down Expand Up @@ -1215,6 +1228,11 @@ func (c *appContext) addressTotals(w http.ResponseWriter, r *http.Request) {
}

totals, err := c.AuxDataSource.AddressTotals(address)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("AddressTotals: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
log.Warnf("failed to get address totals (%s): %v", address, err)
http.Error(w, http.StatusText(422), 422)
Expand All @@ -1237,8 +1255,13 @@ func (c *appContext) getAddressTxTypesData(w http.ResponseWriter, r *http.Reques
return
}

data, err := c.AuxDataSource.GetTxHistoryData(address, dbtypes.TxsType,
data, err := c.AuxDataSource.TxHistoryData(address, dbtypes.TxsType,
dbtypes.TimeGroupingFromStr(chartGrouping))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("TxHistoryData: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
log.Warnf("failed to get address (%s) history by tx type : %v", address, err)
http.Error(w, http.StatusText(422), 422)
Expand All @@ -1261,8 +1284,13 @@ func (c *appContext) getAddressTxAmountFlowData(w http.ResponseWriter, r *http.R
return
}

data, err := c.AuxDataSource.GetTxHistoryData(address, dbtypes.AmountFlow,
data, err := c.AuxDataSource.TxHistoryData(address, dbtypes.AmountFlow,
dbtypes.TimeGroupingFromStr(chartGrouping))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("TxHistoryData: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
log.Warnf("failed to get address (%s) history by amount flow: %v", address, err)
http.Error(w, http.StatusText(422), 422)
Expand All @@ -1285,8 +1313,13 @@ func (c *appContext) getAddressTxUnspentAmountData(w http.ResponseWriter, r *htt
return
}

data, err := c.AuxDataSource.GetTxHistoryData(address, dbtypes.TotalUnspent,
data, err := c.AuxDataSource.TxHistoryData(address, dbtypes.TotalUnspent,
dbtypes.TimeGroupingFromStr(chartGrouping))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("TxHistoryData: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
log.Warnf("failed to get address (%s) history by unspent amount flow: %v", address, err)
http.Error(w, http.StatusText(422), 422)
Expand Down Expand Up @@ -1353,8 +1386,12 @@ func (c *appContext) getAddressTransactions(w http.ResponseWriter, r *http.Reque
txs = c.BlockData.GetAddressTransactionsWithSkip(address, int(count), int(skip))
} else {
txs, err = c.AuxDataSource.AddressTransactionDetails(address, count, skip, dbtypes.AddrTxnAll)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("AddressTransactionDetails: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
}

if txs == nil || err != nil {
http.Error(w, http.StatusText(422), 422)
return
Expand Down
113 changes: 98 additions & 15 deletions api/insight/apiroutes.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (c *insightApiContext) getBlockSummary(w http.ResponseWriter, r *http.Reque
}
var err error
hash, err = c.BlockData.ChainDB.GetBlockHash(int64(idx))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("GetBlockHash: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
writeInsightError(w, "Unable to get block hash from index")
return
Expand Down Expand Up @@ -196,6 +201,11 @@ func (c *insightApiContext) getBlockHash(w http.ResponseWriter, r *http.Request)
return
}
hash, err := c.BlockData.ChainDB.GetBlockHash(int64(idx))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("GetBlockHash: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil || hash == "" {
writeInsightNotFound(w, "Not found")
return
Expand Down Expand Up @@ -229,6 +239,11 @@ func (c *insightApiContext) getRawBlock(w http.ResponseWriter, r *http.Request)
}
var err error
hash, err = c.BlockData.ChainDB.GetBlockHash(int64(idx))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("GetBlockHash: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
writeInsightError(w, "Unable to get block hash from index")
return
Expand Down Expand Up @@ -305,12 +320,21 @@ func (c *insightApiContext) getAddressesTxnOutput(w http.ResponseWriter, r *http
txnOutputs := make([]apitypes.AddressTxnOutput, 0)

for _, address := range addresses {

confirmedTxnOutputs := c.BlockData.ChainDB.GetAddressUTXO(address)
confirmedTxnOutputs, err := c.BlockData.ChainDB.AddressUTXO(address)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("AddressUTXO: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
apiLog.Errorf("Error getting UTXOs: %v", err)
continue
}

addressOuts, _, err := c.MemPool.UnconfirmedTxnsForAddress(address)
if err != nil {
apiLog.Errorf("Error in getting unconfirmed transactions")
apiLog.Errorf("Error getting unconfirmed transactions: %v", err)
continue
}

if addressOuts != nil {
Expand Down Expand Up @@ -445,13 +469,25 @@ func (c *insightApiContext) getTransactions(w http.ResponseWriter, r *http.Reque
return
}
addresses := []string{address}
rawTxs, recentTxs := c.BlockData.ChainDB.InsightPgGetAddressTransactions(addresses, int64(c.Status.Height-2))
rawTxs, recentTxs, err :=
c.BlockData.ChainDB.InsightAddressTransactions(addresses, int64(c.Status.Height-2))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("InsightAddressTransactions: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
writeInsightError(w,
fmt.Sprintf("Error retrieving transactions for addresss %s (%v)",
addresses, err))
return
}

addressOuts, _, err := c.MemPool.UnconfirmedTxnsForAddress(address)
UnconfirmedTxs := []string{}

if err != nil {
writeInsightError(w, fmt.Sprintf("Error gathering mempool transactions (%s)", err))
writeInsightError(w, fmt.Sprintf("Error gathering mempool transactions (%v)", err))
return
}

Expand Down Expand Up @@ -536,7 +572,19 @@ func (c *insightApiContext) getAddressesTxn(w http.ResponseWriter, r *http.Reque
addressOutput := new(apitypes.InsightMultiAddrsTxOutput)
UnconfirmedTxs := []string{}

rawTxs, recentTxs := c.BlockData.ChainDB.InsightPgGetAddressTransactions(addresses, int64(c.Status.Height-2))
rawTxs, recentTxs, err :=
c.BlockData.ChainDB.InsightAddressTransactions(addresses, int64(c.Status.Height-2))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("InsightAddressTransactions: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
writeInsightError(w,
fmt.Sprintf("Error retrieving transactions for addresss %s (%s)",
addresses, err))
return
}

// Confirm all addresses are valid and pull unconfirmed transactions for all addresses
for _, addr := range addresses {
Expand Down Expand Up @@ -639,8 +687,14 @@ func (c *insightApiContext) getAddressBalance(w http.ResponseWriter, r *http.Req
return
}

addressInfo := c.BlockData.ChainDB.GetAddressBalance(address, 20, 0)
if addressInfo == nil {
addressInfo, err := c.BlockData.ChainDB.AddressBalance(address, 20, 0)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("AddressBalance: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil || addressInfo == nil {
apiLog.Warnf("AddressBalance: %v", err)
http.Error(w, http.StatusText(422), 422)
return
}
Expand Down Expand Up @@ -811,11 +865,20 @@ func (c *insightApiContext) getBlockSummaryByTime(w http.ResponseWriter, r *http
summaryOutput.Pagination.CurrentTs = maxTime
summaryOutput.Pagination.MoreTs = maxTime

blockSummary := c.BlockData.ChainDB.GetBlockSummaryTimeRange(minTime, maxTime, 0)
blockSummary, err := c.BlockData.ChainDB.BlockSummaryTimeRange(minTime, maxTime, 0)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("BlockSummaryTimeRange: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
writeInsightError(w, fmt.Sprintf("Unable to retrieve block summaries: %v", err))
return
}

outputBlockSummary := []dbtypes.BlockDataBasic{}

// Generate the pagenation parameters more and moreTs and limit the result
// Generate the pagination parameters More and MoreTs and limit the result.
if limit > 0 {
for i, block := range blockSummary {
if i >= limit {
Expand Down Expand Up @@ -861,8 +924,15 @@ func (c *insightApiContext) getAddressInfo(w http.ResponseWriter, r *http.Reques

// Get Confirmed Balances
var unconfirmedBalanceSat int64
_, _, totalSpent, totalUnspent, _, err := c.BlockData.ChainDB.RetrieveAddressSpentUnspent(address)
_, _, totalSpent, totalUnspent, _, err := c.BlockData.ChainDB.AddressSpentUnspent(address)
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("AddressSpentUnspent: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
apiLog.Errorf("AddressSpentUnspent: %v", err)
http.Error(w, "Unexpected error retrieving address info.", http.StatusInternalServerError)
return
}

Expand All @@ -882,11 +952,24 @@ func (c *insightApiContext) getAddressInfo(w http.ResponseWriter, r *http.Reques

addresses := []string{address}

// Get Confirmed Transactions
rawTxs, recentTxs := c.BlockData.ChainDB.InsightPgGetAddressTransactions(addresses, int64(c.Status.Height-2))
// Get confirmed transactions.
rawTxs, recentTxs, err :=
c.BlockData.ChainDB.InsightAddressTransactions(addresses, int64(c.Status.Height-2))
if dbtypes.IsTimeoutErr(err) {
apiLog.Errorf("InsightAddressTransactions: %v", err)
http.Error(w, "Database timeout.", http.StatusServiceUnavailable)
return
}
if err != nil {
apiLog.Errorf("Error retrieving transactions for addresss %s: %v",
addresses, err)
http.Error(w, "Error retrieving transactions for that addresss.",
http.StatusInternalServerError)
return
}
confirmedTxCount := len(rawTxs)

// Get Unconfirmed Transactions
// Get unconfirmed transactions.
unconfirmedTxs := []string{}
addressOuts, _, err := c.MemPool.UnconfirmedTxnsForAddress(address)
if err != nil {
Expand All @@ -895,7 +978,7 @@ func (c *insightApiContext) getAddressInfo(w http.ResponseWriter, r *http.Reques
if addressOuts != nil {
FUNDING_TX_DUPLICATE_CHECK:
for _, f := range addressOuts.Outpoints {
// Confirm its not already in our recent transactions
// Confirm it's not already in our recent transactions.
for _, v := range recentTxs {
if v == f.Hash.String() {
continue FUNDING_TX_DUPLICATE_CHECK
Expand Down
Loading

0 comments on commit d834d32

Please sign in to comment.