-
Notifications
You must be signed in to change notification settings - Fork 482
Implement PreprocessTxs #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
4815e2a
a829d80
9afafdb
04cbae1
a41e189
1203b18
e0e7a86
6abdee4
7295885
3578791
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| package app | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "errors" | ||
| "fmt" | ||
| "sort" | ||
|
|
||
| sdk "github.com/cosmos/cosmos-sdk/types" | ||
| "github.com/lazyledger/lazyledger-app/x/lazyledgerapp/types" | ||
| abci "github.com/lazyledger/lazyledger-core/abci/types" | ||
| core "github.com/lazyledger/lazyledger-core/proto/tendermint/types" | ||
| ) | ||
|
|
||
| const ( | ||
| // runTxModeDeliver determines if the tx being ran should only be checked, | ||
| // or if it should actually modify the state. | ||
| // copy and pasted from the sdk, as it is not exported | ||
| // https://github.com/lazyledger/cosmos-sdk/blob/3addd7f65d1c35f76716d67641ee629b01f5b9c7/baseapp/baseapp.go#L28 | ||
| runTxModeDeliver = 3 | ||
| ) | ||
|
|
||
| // This file should contain all of the altered ABCI methods | ||
|
|
||
| // PreprocessTxs fullfills the lazyledger-core version of the ACBI interface, by | ||
| // performing basic validation for the incoming txs, and by cleanly separating | ||
| // share messages from transactions | ||
| // todo(evan): refactor out a for loop. | ||
| func (app *App) PreprocessTxs(txs abci.RequestPreprocessTxs) abci.ResponsePreprocessTxs { | ||
| squareSize := app.SquareSize() | ||
| shareCounter := uint64(0) | ||
| var shareMsgs []*core.Message | ||
| var processedTxs [][]byte | ||
| for _, rawTx := range txs.Txs { | ||
| // decode the Tx | ||
| tx, err := app.txConfig.TxDecoder()(rawTx) | ||
| if err != nil { | ||
| continue | ||
| } | ||
|
|
||
| // don't process the tx if the transaction doesn't contain a | ||
| // PayForMessage sdk.Msg | ||
| if !hasWirePayForMessage(tx) { | ||
| processedTxs = append(processedTxs, rawTx) | ||
| continue | ||
| } | ||
|
|
||
| // only support transactions that contain a single sdk.Msg | ||
| if len(tx.GetMsgs()) != 1 { | ||
| continue | ||
| } | ||
|
|
||
| msg := tx.GetMsgs()[0] | ||
|
|
||
| // run basic validation on the transaction | ||
| err = tx.ValidateBasic() | ||
| if err != nil { | ||
| continue | ||
| } | ||
|
|
||
| // process the message | ||
| coreMsg, signedTx, err := app.processMsg(msg) | ||
| if err != nil { | ||
| continue | ||
| } | ||
|
|
||
| // execute the tx in runTxModeDeliver mode (3) | ||
| // execution includes all validation checks burning fees | ||
| // currently, no fees are burned | ||
| _, _, err = app.BaseApp.TxRunner()(runTxModeDeliver, rawTx) | ||
| if err != nil { | ||
| continue | ||
| } | ||
|
|
||
| // increment the share counter by the number of shares taken by the message | ||
| sharesTaken := uint64(len(coreMsg.Data) / types.ShareSize) | ||
| shareCounter += sharesTaken | ||
|
|
||
| // if there are too many shares stop processing and return the transactions | ||
| if shareCounter > squareSize*squareSize { | ||
| break | ||
| } | ||
|
|
||
| // encode the processed tx | ||
| rawProcessedTx, err := app.appCodec.MarshalBinaryBare(signedTx) | ||
| if err != nil { | ||
| continue | ||
| } | ||
|
|
||
| // add the message and tx to the output | ||
| shareMsgs = append(shareMsgs, &coreMsg) | ||
| processedTxs = append(processedTxs, rawProcessedTx) | ||
| } | ||
|
|
||
| // sort messages lexigraphically | ||
| sort.Slice(shareMsgs, func(i, j int) bool { | ||
| return bytes.Compare(shareMsgs[i].NamespaceId, shareMsgs[j].NamespaceId) < 0 | ||
| }) | ||
|
|
||
| return abci.ResponsePreprocessTxs{ | ||
| Txs: processedTxs, | ||
| Messages: &core.Messages{MessagesList: shareMsgs}, | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that in PreProcess the app also needs to track the (original / mempool) Tx that were processed, s.t. on re-check it can tell ll-core to remove them from the mempool. This has the caveat that currently only the proposer will call preprocess: the app needs to know if a proposed block didn't make it through consensus (e.g. because of timeouts) and other nodes to be able to evict their mempools too...
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily? Conflicting transactions (which includes duplicates in the mempool) between the block and the mempool are removed with CheckTx, so PreProcess doesn't technically need to do that?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like I'm confused about this again, then 🤔 Sure, it "sees" the Tx during each deliver Tx but only the already processed which are different from the mempool Tx potenially.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leaving this here for reference (quoting @adlerjohn):
So my concern was the mapping between mempool tx <-> block tx but as mentioned by John that mapping is essentially there via nonce and sender. |
||
|
|
||
| func hasWirePayForMessage(tx sdk.Tx) bool { | ||
| for _, msg := range tx.GetMsgs() { | ||
| if msg.Type() == types.TypeMsgPayforMessage { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // processMsgs will perform the processing required by PreProcessTxs for a set | ||
| // of sdk.Msg's from a single sdk.Tx | ||
| func (app *App) processMsg(msg sdk.Msg) (core.Message, *types.TxSignedTransactionDataPayForMessage, error) { | ||
| squareSize := app.SquareSize() | ||
| // reject all msgs in tx if a single included msg is not correct type | ||
| wireMsg, ok := msg.(*types.MsgWirePayForMessage) | ||
| if !ok { | ||
| return core.Message{}, | ||
| nil, | ||
| errors.New("transaction contained a message type other than types.MsgWirePayForMessage") | ||
| } | ||
|
|
||
| // make sure that a ShareCommitAndSignature of the correct size is | ||
| // included in the message | ||
| var shareCommit types.ShareCommitAndSignature | ||
| for _, commit := range wireMsg.MessageShareCommitment { | ||
| if commit.K == squareSize { | ||
| shareCommit = commit | ||
| } | ||
| } | ||
| // K == 0 means there was no share commit with the desired current square size | ||
| if shareCommit.K == 0 { | ||
| return core.Message{}, | ||
| nil, | ||
| fmt.Errorf("No share commit for correct square size. Current square size: %d", squareSize) | ||
| } | ||
|
|
||
| // add the message to the list of core message to be returned to ll-core | ||
| coreMsg := core.Message{ | ||
| NamespaceId: wireMsg.GetMessageNameSpaceId(), | ||
| Data: wireMsg.GetMessage(), | ||
| } | ||
|
|
||
| // wrap the signed transaction data | ||
| sTxData, err := wireMsg.SignedTransactionDataPayForMessage(squareSize) | ||
| if err != nil { | ||
| return core.Message{}, nil, err | ||
| } | ||
|
|
||
| signedData := &types.TxSignedTransactionDataPayForMessage{ | ||
| Message: sTxData, | ||
| Signature: shareCommit.Signature, | ||
| PublicKey: wireMsg.PublicKey, | ||
| } | ||
|
|
||
| return coreMsg, signedData, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, didn't we say to not execute the Tx immediately until we add explicit ABCI methods for that purpose?
See: https://github.com/tendermint/spec/issues/208#issuecomment-721158558
and tendermint/spec#254
Also, if run the Tx during preprocess, won't that app still try to execute the Tx from the previous block during, too?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sdk uses the
runTxmethod to run stateful checks and execute any state changing transactions. While therunTxModeDeliver(signal to keep changes to the state) flag is being passed torunTx, the handler doesn't do anything (see below).Under the hood, there are two distinct transactions, so we can specify which state needs to change during
PreprocessTxsand which need to change duringDeliverTx. There isMsgWirePayForMessage, which is passed toPreprocessTxsand is being pushed throughrunTxduringPreprocessTxs. There's alsoSignedTransactionDataPayForMessage, which is derived fromMsgWirePayForMessageas described in the spec, which is not being pushed throughrunTxuntilDeliverTxand is instead returned byPreprocessTxs. Each is ran separately, and each has its own signature.The actual actions that occur during transaction execution are dictated in the module's
Keeper. We can look at what the handler forMsgWirePayForMessagedoes here, which is nothing atm. There is not currently a handler forSignedTransactionDataPayForMessage, which will throw an error if it is ran later viaDeliverTx.While throwing an error ifactually, this shouldn't throw an error! I'll include a handler that doesn't do anything.SignedTransactionDataPayForMessageis passed toDeliverTxwas intentional, I think having a handler for it that always throws an error would be more explicit.This brings up that I should have better documentation for all of this "under-the-hood" stuff, even if some of it is temporary. Where would the best place for that be? I was planning on making a full blown integration test later #24, but this might need to be done in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the
runTxModeDeliversounds like it will actually deliver the Txs. That is what confused me. Thanks for clarifying.This brings us back to that ADR discussion. Maybe we should document this kind of decision in light-weight ADRs that either precede or accompany PRs. And yes, (integration) tests would be a great way to increase confidence that everything behaves as we expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking with @evan-forbes in person, we've decided to drop the runTx call and rely soley on checkTx for now. We will still have to take care of this in the future but we can likely tackle this together with immediate execution (which, as far as I understand is required for fee burning anyway).