-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: ADR-038 plugin system #10639
feat: ADR-038 plugin system #10639
Conversation
163af5f
to
5a9cd0e
Compare
I am encouraged by this update that ensures reliable and consistent state sync through listeners. One potential use case of a fully replicated state store through this approach would be to allow for extensible query stores. Roughly speaking if the msg router middle wear components support a plugin architecture as well (at some point in the future) then a more powerful indexing state store could be seamlessly added to a given node with support for additional query messages. Example: Proper advanced queries with paging and ordering support could be supported by a parallel postgres db that was only added to specific nodes as required. This configuration might use a plugin state listener and a plugin that adds query msgs to the router. The postgres database would be kept in sync using the state listener interface. Existing modules that write a bunch of kvstore indexes could be pruned back and simplified as the more powerful pluggable solutions could be added when needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Some questions and nits. Also it would be good to have some kind of test for the loader, as there's a lot of pieces to it, and it would help document the workflow for creating plugins too.
Init(env serverTypes.AppOptions) error | ||
|
||
// Closer interface to shutting down the plugin process | ||
io.Closer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is equivalent to a Close() error
method right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this basically extends https://pkg.go.dev/io#Closer interface
@@ -0,0 +1,9 @@ | |||
include mk/header.mk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this and footer.mk
? I think I'm missing something here. I assume d
and go-pkg-name
are defined in this header
And should these Makefiles be incorporated into the main sdk Makefile?
COSMOS_PLUGINS ?= | ||
export COSMOS_PLUGINS | ||
|
||
$(d)/preload.go: d:=$(d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
up, let's add doc comments.
// generate the new file | ||
dstFile, err := fss.openBeginBlockFile(req) | ||
if err != nil { | ||
fss.ackStatus = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this would be a bit cleaner/more readable with defer func() { if err != nil { fss.ackStatus = false } }()
, although it requires err
to be a named return value.
Or couldn't ackStatus
just be set to false first, then true on a successful return?
pkg=${2:?second parameter with full name of the package is required} | ||
main_pkg="$dir/main" | ||
|
||
shortpkg="uniquepkgname" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
shortpkg="uniquepkgname" | |
shortpkg="pkg" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely, the original is not that "short" ;)
This package contains an extensible plugin system for the Cosmos-SDK. Included in this top-level package is the base interface | ||
for a Cosmos-SDK plugin, as well as more specific plugin interface definitions that build on top of this base interface. | ||
The [loader](./loader) sub-directory contains the Go package and scripts for loading plugins into the SDK. The [plugins](./plugins) | ||
sub-directory contains the preloaded plugins and a script for building them, this is also the directory that the plugin loader will look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sub-directory contains the preloaded plugins and a script for building them, this is also the directory that the plugin loader will look | |
sub-directory contains the preloaded plugins and a script for building them, this is also the directory where the plugin loader will look |
|
||
As mentioned above, some plugins can be preloaded. This means they do not need to be loaded from the specified `plugins.dir` and instead | ||
are loaded by default. At this time the only preloaded plugin is the [file streaming service plugin](./plugins/file). | ||
Plugins can be added to the preloaded set by adding the plugin to the [plugins dir](../../plugin/plugin.go) and modifying the [preload_list](../../plugin/loader/preload_list). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plugins can be added to the preloaded set by adding the plugin to the [plugins dir](../../plugin/plugin.go) and modifying the [preload_list](../../plugin/loader/preload_list). | |
Plugins can be added to the preloaded set by adding the plugin to the [plugins dir](./plugins/) and modifying the [preload_list](./loader/preload_list). |
[plugins] | ||
on = false # turn the plugin system, as a whole, on or off | ||
disabled = ["list", "of", "plugin", "names", "to", "disable"] | ||
dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more precisely $GOPATH/src/github.com/cosmos/cosmos-sdk/plugin/plugins
(but it expects to find built libraries there? sorry if I'm misunderstanding, I haven't fully tested this)
``` | ||
|
||
As mentioned above, some plugins can be preloaded. This means they do not need to be loaded from the specified `plugins.dir` and instead | ||
are loaded by default. At this time the only preloaded plugin is the [file streaming service plugin](./plugins/file). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if I understand right, preloaded plugins are intended to be those built in to the SDK, that are always available but can be disabled.
And if the user wants to add their own new plugin, the workflow should go roughly like:
- Write the plugin
- Set
plugins.dir
to whatever location contains the built library - Set other config options if needed
Done. Whereas adding a new preloaded plugin involves the above, plus updating preload_list
and generating the boilerplate files within the plugins/
directory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or... we can load plugins manually in app.go (the example is below). I think both options are valid. The latter one (manual) is less magic, but will fail with automatic SoftwareUpgrade x/gov proposal + cosmovisor.
So:
- need to update the docs about compatibility with SoftwareUpgrade x/gov proposal
- update ADR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few questions. sorry for the delay in reviews
pluginsOnKey := fmt.Sprintf("%s.%s", plugin.PLUGINS_TOML_KEY, plugin.PLUGINS_ON_TOML_KEY) | ||
if cast.ToBool(appOpts.Get(pluginsOnKey)) { | ||
// this loads the preloaded and any plugins found in `plugins.dir` | ||
pluginLoader, err := loader.NewPluginLoader(appOpts, logger) | ||
if err != nil { | ||
// handle error | ||
} | ||
|
||
// initialize the loaded plugins | ||
if err := pluginLoader.Initialize(); err != nil { | ||
// handle error | ||
} | ||
|
||
// register the plugin(s) with the BaseApp | ||
if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil { | ||
// handle error | ||
} | ||
|
||
// start the plugin services, optionally use wg to synchronize shutdown using io.Closer | ||
wg := new(sync.WaitGroup) | ||
if err := pluginLoader.Start(wg); err != nil { | ||
// handler error | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need to go into simapp? Is this to get this released sooner than later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this stage, it has to be somehow integrated. With app wiring, some parts could be automated, but this is not available at the time of writing this design.
present in this mapping will be dependent on the specific `StateStreamingPlugin`, but we will introduce some standards | ||
here using the file `StateStreamingPlugin`: | ||
|
||
Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. `plugins.streaming`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a case a node would want to support multiple streamers? Also would be good to see how this effects the nodes performance. I can't remember if this is blocking in anyway?
```toml | ||
[plugins] | ||
on = false # turn the plugin system, as a whole, on or off | ||
disabled = ["list", "of", "plugin", "names", "to", "disable"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be opt in by default not opt out?
# in milliseconds | ||
global_ack_wait_limit = 500 | ||
[plugins.streaming.file] # the specific parameters for the file streaming service plugin | ||
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if left empty it will stream everything right?
@@ -0,0 +1,155 @@ | |||
# Comsos-SDK Plugins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be its own go mod? This would allow for easier release cycles and allow us to tag 1.0 on the streamer sooner.
If this path is taken I would propose 4 months of v1-alpha and v1 after that so we can get user feedback on things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for module.
Re versioning, I think we should release v1 as soon as it works and it's tested and 2-3 plugins are integrated and tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, let's rename this directory (and module) to indexer/plugin.
subsequent state changes are written out to this file until the first `DeliverTx` request is received. At the head of these files, | ||
the length-prefixed protobuf encoded `BeginBlock` request is written, and the response is written at the tail. | ||
|
||
After every `DeliverTx` request a new file is created with the name `block-{N}-tx-{M}` where N is the block number and M |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: would this not cause an extreme amount of files in the file system? would it not be better to have one file for all txs/state updates in a block?
@i-norden where are we at with this? Looks like reviews about a month ago but no activity since? |
// each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel | ||
// but the BaseApp also imposes a global wait limit | ||
if app.globalWaitLimit > 0 { | ||
maxWait := time.NewTicker(app.globalWaitLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxWait := time.NewTicker(app.globalWaitLimit) | |
maxWait := time.NewTicker(app.globalWaitLimit) | |
defer maxWait.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious if linter can flag this.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't review:
- loading mechanism ... is there a better way than doing it with makefile and compile options?
IMHO, we should update and revisit this changes - see review comments.
app.halt() | ||
} | ||
case <-maxWait.C: | ||
app.halt() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we are halting the app in this case? Do we de prioritize blockchain liveness? Moreoveer, I don't think we should really block the Commit phase. This should happen in parallel: listeners must be designed to consume faster then then the blockchain speed, which is not a problem for all sane designs.
If we will agree here, then we will need to update the ADR as well.
@@ -129,6 +130,10 @@ type BaseApp struct { // nolint: maligned | |||
// abciListeners for hooking into the ABCI message processing of the BaseApp | |||
// and exposing the requests and responses to external consumers | |||
abciListeners []ABCIListener | |||
|
|||
// globalWaitTime is the maximum amount of time the BaseApp will wait for positive acknowledgement of message | |||
// receipt from ABCIListeners before halting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the docs:
- we don't wait for a message, but checking that listeners consumed all block input at the by the end of the commit phase. Note: I'm against this design -> let's continue that part in the thread above.
// SetGlobalWaitLimit is used to set the maximum amount of time the BaseApp will wait for positive acknowledgement | ||
// of message receipt from ABCIListeners before halting | ||
func (app *BaseApp) SetGlobalWaitLimit(t time.Duration) { | ||
app.globalWaitLimit = t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer that we put it inot BaseApp constructor to make it clear that this must be configured.
authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, | ||
minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, | ||
govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, | ||
evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's don't remove the indentation
if err := pluginLoader.Start(wg); err != nil { | ||
// handler error | ||
} | ||
// initialize the loaded plugins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function name is clear enough - this comment is not needed.
listeners: listeners, | ||
srcChan: listenChan, | ||
filePrefix: filePrefix, | ||
writeDir: writeDir, | ||
codec: c, | ||
stateCache: make([][]byte, 0), | ||
stateCacheLock: new(sync.Mutex), | ||
ack: ack, | ||
ackChan: make(chan bool), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ackChan: make(chan bool), | |
ackChan: make(chan bool), | |
ackStatus: true, |
// ListenDeliverTx satisfies the Hook interface | ||
// It writes out the received DeliverTx request and response and the resulting state changes out to a file as described | ||
// in the above the naming schema | ||
func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { | |
func (fss *FileStreamingService) ConsumeDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for other methods
close(fss.quitChan) | ||
return nil | ||
} | ||
|
||
// ListenSuccess returns a chan that is used to acknowledge successful receipt of messages by the external service | ||
// after some configurable delay, `false` is sent to this channel from the service to signify failure of receipt | ||
func (fss *FileStreamingService) ListenSuccess() <-chan bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (fss *FileStreamingService) ListenSuccess() <-chan bool { | |
func (fss *FileStreamingService) FeedSuccess() <-chan bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is not listening on anything
close(fss.quitChan) | ||
return nil | ||
} | ||
|
||
// ListenSuccess returns a chan that is used to acknowledge successful receipt of messages by the external service | ||
// after some configurable delay, `false` is sent to this channel from the service to signify failure of receipt | ||
func (fss *FileStreamingService) ListenSuccess() <-chan bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moreover, I think that ackChan
should be a parameter of this function, rather being part of the struct. Returning a private value / chan is smells badly.
@@ -108,6 +135,7 @@ func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestB | |||
if _, err = dstFile.Write(stateChange); err != nil { | |||
fss.stateCache = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fss.stateCache
should be a chan (and called stateChan
or dataFeed
) - this way we won't need to use lock and open a box of potential sync errors.
Hi all, many apologies for the extreme delay in review response. A lot of things here have been deprecated or permutated by @egaxhaj-figure 's PR that is built on top of this and the corresponding spec changes introduced in #11175. I'm going to leave this open for the time being, and will begin working through the comments here and transposing them over to the new PR if they are still relevant. |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
What is left from this work? should we close it or? |
All is left is for @i-norden to work through the comments on this and the additional work in #11691 built on top of this. #10639 (comment) @i-norden have you been able to make progress on this? |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
For #10096
Implements #10482
This is an extension and refactor of the existing ADR-038 streaming service work to introduce a plugin system to the SDK and load streaming services using this system rather than building them into the SDK binary.
The plugin system introduced here is meant to be extensible, so that if other components/features of the SDK wish to be included as plugins in the future they can extend the base plugin interface defined here and leverage this plugin building and loading/preloading system.
This PR also ports the
FileStreamingService
to the new plugin architecture, serving as an example of how to build state streaming plugins.This PR also updates to add the success/failure acknowledgement feature for the external streaming services as discussed in #10482
Before we can merge this PR, please make sure that all the following items have been
checked off. If any of the checklist items are not applicable, please leave them but
write a little note why.
docs/
) or specification (x/<module>/spec/
)godoc
comments.Unreleased
section inCHANGELOG.md
Files changed
in the Github PR explorerCodecov Report
in the comment section below once CI passes