Skip to content

Commit

Permalink
feat: Ability to add new pools with options
Browse files Browse the repository at this point in the history
  • Loading branch information
cv65kr committed Dec 16, 2024
1 parent 313074c commit c97ed56
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ golang.org/x/oauth2 v0.19.0/go.mod h1:vYi7skDa1x015PmRRYZ7+s1cWyPgrPiSYRe4rnsexc
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE=
golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -1137,6 +1138,7 @@ golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand Down
12 changes: 12 additions & 0 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ func (p *Plugin) NewPool(ctx context.Context, cfg *pool.Config, env map[string]s
return pl, nil
}

func (p *Plugin) NewPoolWithOptions(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger, options ...staticPool.Options) (*staticPool.Pool, error) {
p.mu.Lock()
defer p.mu.Unlock()

pl, err := staticPool.NewPool(ctx, pool.Command(p.customCmd(env)), p.factory, cfg, p.log, options...)
if err != nil {
return nil, err
}

return pl, nil
}

// UID returns a user id (if specified by user)
func (p *Plugin) UID() int {
if p.cfg.User == "" {
Expand Down
1 change: 1 addition & 0 deletions tests/plugin_pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Configurer interface {
// Server creates workers for the application.
type Server interface {
NewPool(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger) (*staticPool.Pool, error)
NewPoolWithOptions(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger, options ...staticPool.Options) (*staticPool.Pool, error)
NewWorker(ctx context.Context, env map[string]string) (*worker.Process, error)
}

Expand Down
98 changes: 98 additions & 0 deletions tests/plugin_pool_with_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package tests

import (
"context"
staticPool "github.com/roadrunner-server/pool/pool/static_pool"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/pool/payload"
serverImpl "github.com/roadrunner-server/server/v5"
)

type Foo5 struct {
configProvider Configurer
wf Server
pool Pool
}

func (f *Foo5) Init(p Configurer, workerFactory Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
}

func (f *Foo5) Serve() chan error {
const op = errors.Op("serve")
var err error
errCh := make(chan error, 1)
conf := &serverImpl.Config{}

// test payload for echo
r := &payload.Payload{
Context: nil,
Body: []byte(Response),
}

err = f.configProvider.UnmarshalKey(ConfigSection, conf)
if err != nil {
errCh <- err
return errCh
}

// test worker creation
w, err := f.wf.NewWorker(context.Background(), nil)
if err != nil {
errCh <- err
return errCh
}

go func() {
_ = w.Wait()
}()

rsp, err := w.Exec(context.Background(), r)
if err != nil {
errCh <- err
return errCh
}

if string(rsp.Body) != Response {
errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
return errCh
}

// should not be errors
err = w.Stop()
if err != nil {
errCh <- err
return errCh
}

// test pool
f.pool, err = f.wf.NewPoolWithOptions(context.Background(), testPoolConfig, nil, nil, staticPool.WithQueueSize(10))
if err != nil {
errCh <- err
return errCh
}

// test pool execution
rs, err := f.pool.Exec(context.Background(), r, make(chan struct{}))
if err != nil {
errCh <- err
return errCh
}

rspp := <-rs

// echo of the "test" should be -> test
if string(rspp.Body()) != Response {
errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rspp.Body()))
return errCh
}

return errCh
}

func (f *Foo5) Stop(context.Context) error {
return nil
}
59 changes: 59 additions & 0 deletions tests/server_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,62 @@ func TestOnInitMetrics(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
}

func TestNewPoolWithOptions(t *testing.T) {
cont := endure.New(slog.LevelDebug)

// config plugin
vp := &config.Plugin{
Version: "v2024.1.0",
Path: "configs/.rr-tcp.yaml",
}

err := cont.RegisterAll(
vp,
&server.Plugin{},
&Foo5{},
&logger.Plugin{},
)
require.NoError(t, err)

err = cont.Init()
require.NoError(t, err)

ch, err := cont.Serve()
require.NoError(t, err)

// stop by CTRL+C
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

wg := &sync.WaitGroup{}
wg.Add(1)

stopCh := make(chan struct{}, 1)

go func() {
defer wg.Done()
for {
select {
case e := <-ch:
assert.Fail(t, "error", e.Error.Error())
case <-sig:
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
case <-stopCh:
// timeout
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
}
}
}()

stopCh <- struct{}{}
wg.Wait()
}

0 comments on commit c97ed56

Please sign in to comment.