From 2367a151e72e3b1f0e88373b884d65f3207cc846 Mon Sep 17 00:00:00 2001 From: Kryvchun Date: Fri, 24 Jun 2022 14:30:58 +0300 Subject: [PATCH 1/3] feat: support connections pool --- .gitignore | 3 +- cmd/example/main.go | 11 +- docker-compose.yml | 42 +++++++ readme.md | 38 +----- sonic/control.go | 26 +++-- sonic/controller.go | 50 ++++++++ sonic/controller_test.go | 28 +++++ sonic/driver.go | 47 +++++++- sonic/ingester.go | 123 ++++++++++++-------- sonic/ingester_test.go | 126 ++++++++++++++++---- sonic/options.go | 79 +++++++++++++ sonic/pool.go | 162 ++++++++++++++++++++++++++ sonic/pool_test.go | 246 +++++++++++++++++++++++++++++++++++++++ sonic/search.go | 49 +++++--- sonic/search_test.go | 96 +++++++++++++++ sonic/sonic_test.go | 50 ++++++++ testdata/Dockerfile | 3 + testdata/sonic.cfg | 66 +++++++++++ 18 files changed, 1104 insertions(+), 141 deletions(-) create mode 100644 docker-compose.yml create mode 100644 sonic/controller.go create mode 100644 sonic/controller_test.go create mode 100644 sonic/options.go create mode 100644 sonic/pool.go create mode 100644 sonic/pool_test.go create mode 100644 sonic/search_test.go create mode 100644 sonic/sonic_test.go create mode 100644 testdata/Dockerfile create mode 100644 testdata/sonic.cfg diff --git a/.gitignore b/.gitignore index a999088..7f898b5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea/* .idea/**/* -.idea \ No newline at end of file +.idea +.vscode diff --git a/cmd/example/main.go b/cmd/example/main.go index fcf9c2d..9fd2abd 100644 --- a/cmd/example/main.go +++ b/cmd/example/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "time" "github.com/expectedsh/go-sonic/sonic" ) @@ -9,8 +10,14 @@ import ( const pswd = "SecretPassword" func main() { - - ingester, err := sonic.NewIngester("localhost", 1491, pswd) + ingester, err := sonic.NewIngester( + "localhost", + 1491, + pswd, + sonic.OptionPoolMaxIdleConnections(16), + sonic.OptionPoolMinIdleConnections(1), + sonic.OptionPoolPingThreshold(time.Minute), + ) if err != nil { panic(err) } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c19ea2e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,42 @@ +--- +version: "3" + +# This file is used for testing the library. +# +# Running tests in docker: +# docker-compose run --rm test +# +# Running tests locally: +# export TEST_SONIC_ADDR="sonic://:SecretPassword@127.0.0.1:1491" +# docker-compose up -d sonic +# go test ./... +# +# Cleanup: +# docker-compose down + +services: + sonic: + build: + context: ./testdata + volumes: + - ./testdata/sonic.cfg:/etc/sonic.cfg:ro + ports: + - "127.0.0.1:1491:1491" + environment: + RUST_BACKTRACE: "full" + healthcheck: + test: nc -z 127.0.0.1 1491 + interval: 5s + timeout: 3s + retries: 7 + + test: + image: "golang:1.18.3" + depends_on: + sonic: + condition: service_healthy + volumes: + - .:/app:ro + environment: + - TEST_SONIC_ADDR=sonic://:SecretPassword@sonic:1491 + command: bash -c "cd /app && go test ./..." diff --git a/readme.md b/readme.md index 7553eaf..a1063a4 100644 --- a/readme.md +++ b/readme.md @@ -21,7 +21,6 @@ import ( ) func main() { - ingester, err := sonic.NewIngester("localhost", 1491, "SecretPassword") if err != nil { panic(err) @@ -67,39 +66,4 @@ Hardware detail: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz ### Thread Safety -The driver itself isn't thread safe. You could use locks or channels to avoid crashes. - -```go -package main - -import ( - "fmt" - - "github.com/expectedsh/go-sonic/sonic" -) - -func main() { - events := make(chan []string, 1) - - event := []string{"some_text", "some_id"} - tryCrash := func() { - for { - // replace "event" with whatever is giving you events: pubsub, amqp messages… - events <- event - } - } - - go tryCrash() - go tryCrash() - go tryCrash() - go tryCrash() - - ingester, _ := sonic.NewIngester("localhost", 1491, "SecretPassword") - - for { - msg := <-events - // Or use some buffering along with BulkPush - ingester.Push("collection", "bucket", msg[1], msg[0]) - } -} -``` +The driver itself uses a connections pool, and it is thread-safe. diff --git a/sonic/control.go b/sonic/control.go index f0f44e9..48cb87f 100644 --- a/sonic/control.go +++ b/sonic/control.go @@ -23,24 +23,19 @@ type Controllable interface { // controlChannel is used for administration purposes. type controlChannel struct { - *driver + *driversHolder } // NewControl create a new driver instance with a controlChannel instance. // Only way to get a Controllable implementation. -func NewControl(host string, port int, password string) (Controllable, error) { - driver := &driver{ - Host: host, - Port: port, - Password: password, - channel: Control, - } - err := driver.Connect() +func NewControl(host string, port int, password string, opts ...OptionSetter) (Controllable, error) { + driversHolder, err := newDriversHolder(defaultOptions(host, port, password, Control).With(opts...)) if err != nil { return nil, err } + return controlChannel{ - driver: driver, + driversHolder: driversHolder, }, nil } @@ -48,13 +43,20 @@ func (c controlChannel) Trigger(action Action) (err error) { if !IsActionValid(action) { return ErrActionName } - err = c.write(fmt.Sprintf("TRIGGER %s", action)) + + d, err := c.Get() + if err != nil { + return err + } + defer d.close() + + err = d.write(fmt.Sprintf("TRIGGER %s", action)) if err != nil { return err } // should get OK - _, err = c.read() + _, err = d.read() if err != nil { return err } diff --git a/sonic/controller.go b/sonic/controller.go new file mode 100644 index 0000000..736602d --- /dev/null +++ b/sonic/controller.go @@ -0,0 +1,50 @@ +package sonic + +// driversHolder defines base interface around driversPool. +type driversHolder struct { + *driversPool +} + +func newDriversHolder( + opts controllerOptions, +) (*driversHolder, error) { + df := driverFactory{ + Host: opts.Host, + Port: opts.Port, + Password: opts.Password, + Channel: opts.Channel, + } + + dp, err := newDriversPool( + &df, + opts.PoolMinConnections, + opts.PoolMaxConnections, + opts.PoolPingThreshold, + opts.PoolMaxIdleLifetime, + ) + if err != nil { + return nil, err + } + + return &driversHolder{ + driversPool: dp, + }, nil +} + +// Quit all connections and close the pool. It never returns an error. +func (c *driversHolder) Quit() error { + c.driversPool.Close() + + return nil +} + +// Ping one connection. +func (c *driversHolder) Ping() error { + d, err := c.Get() + if err != nil { + return err + } + defer d.close() + + return d.Ping() +} diff --git a/sonic/controller_test.go b/sonic/controller_test.go new file mode 100644 index 0000000..ac3d9e8 --- /dev/null +++ b/sonic/controller_test.go @@ -0,0 +1,28 @@ +package sonic_test + +import ( + "testing" + + "github.com/expectedsh/go-sonic/sonic" +) + +func TestController(t *testing.T) { + t.Parallel() + + var ctrl sonic.Base = getIngester(t) + + err := ctrl.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + err = ctrl.Quit() + if err != nil { + t.Fatal("Quit", err) + } + + err = ctrl.Ping() + if err == nil { + t.Fatal("Ping", err) + } +} diff --git a/sonic/driver.go b/sonic/driver.go index b1b846a..0e69275 100644 --- a/sonic/driver.go +++ b/sonic/driver.go @@ -2,6 +2,7 @@ package sonic import ( "errors" + "time" ) var ( @@ -32,10 +33,32 @@ type driver struct { Port int Password string + lastUse time.Time + lastPing time.Time + channel Channel *connection } +type driverFactory struct { + Host string + Port int + Password string + Channel Channel +} + +func (df driverFactory) Build() *driver { + return &driver{ + Host: df.Host, + Port: df.Port, + Password: df.Password, + channel: df.Channel, + + lastPing: time.Time{}, + connection: nil, + } +} + // Connect open a connection via TCP with the sonic server. func (c *driver) Connect() error { if !IsChannelValid(c.channel) { @@ -43,7 +66,11 @@ func (c *driver) Connect() error { } var err error + c.connection, err = newConnection(c) + c.lastPing = time.Now() + c.lastUse = time.Now() + return err } @@ -60,7 +87,7 @@ func (c *driver) Quit() error { return err } -func (c driver) Ping() error { +func (c *driver) Ping() error { err := c.write("PING") if err != nil { return err @@ -71,5 +98,23 @@ func (c driver) Ping() error { if err != nil { return err } + + c.lastPing = time.Now() + return nil } + +// softPing pings the connection if it wasn't pinged for a while. +func (c *driver) checkConn(pingThreshold, maxLifetime time.Duration) (ok bool) { + if maxLifetime > 0 && time.Since(c.lastUse) > maxLifetime { + return false + } + + c.lastUse = time.Now() + + if pingThreshold > 0 && time.Since(c.lastPing) > pingThreshold { + return c.Ping() == nil + } + + return true +} diff --git a/sonic/ingester.go b/sonic/ingester.go index fab7869..9e9669b 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -75,28 +75,39 @@ const ( ) type ingesterChannel struct { - *driver + *driversHolder } // NewIngester create a new driver instance with a ingesterChannel instance. // Only way to get a Ingestable implementation. -func NewIngester(host string, port int, password string) (Ingestable, error) { - driver := &driver{ - Host: host, - Port: port, - Password: password, - channel: Ingest, - } - err := driver.Connect() +func NewIngester( + host string, + port int, + password string, + opts ...OptionSetter, +) (Ingestable, error) { + driversHolder, err := newDriversHolder(defaultOptions( + host, + port, + password, + Ingest, + ).With(opts...)) if err != nil { return nil, err } + return ingesterChannel{ - driver: driver, + driversHolder: driversHolder, }, nil } func (i ingesterChannel) Push(collection, bucket, object, text string, lang Lang) (err error) { + d, err := i.Get() + if err != nil { + return err + } + defer d.close() + // patterns := []struct { Pattern string @@ -108,18 +119,18 @@ func (i ingesterChannel) Push(collection, bucket, object, text string, lang Lang text = strings.Replace(text, v.Pattern, v.Replacement, -1) } - chunks := splitText(text, i.cmdMaxBytes/2) + chunks := splitText(text, d.cmdMaxBytes/2) // split chunks with partial success will yield single error for _, chunk := range chunks { ff := fmt.Sprintf("%s %s %s %s \"%s\""+langFormat(lang), push, collection, bucket, object, chunk, lang) - err = i.write(ff) + err = d.write(ff) if err != nil { return err } // sonic should sent OK - _, err = i.read() + _, err = d.read() if err != nil { return err } @@ -169,26 +180,22 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in for _, r := range divided { r := r - go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) { + go func( + collection string, + bucket string, + recs []IngestBulkRecord, + bulkErrorChan chan<- []IngestBulkError, + ) { errs := make([]IngestBulkError, 0) - newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password) for _, rec := range recs { - if newIngester == nil { - addBulkError(&errs, rec, ErrClosed) - continue - } - err := newIngester.Push(collection, bucket, rec.Object, rec.Text, lang) + err := i.Push(collection, bucket, rec.Object, rec.Text, lang) if err != nil { addBulkError(&errs, rec, err) } } - - if newIngester != nil { - _ = newIngester.Quit() - } bulkErrorChan <- errs - }(i.driver, collection, bucket, r, bulkErrorChan) + }(collection, bucket, r, bulkErrorChan) } errs = make([]IngestBulkError, 0) @@ -200,13 +207,19 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in } func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error) { - err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", pop, collection, bucket, object, text)) + d, err := i.Get() + if err != nil { + return err + } + defer d.close() + + err = d.write(fmt.Sprintf("%s %s %s %s \"%s\"", pop, collection, bucket, object, text)) if err != nil { return err } // sonic should sent OK - _, err = i.read() + _, err = d.read() if err != nil { return err } @@ -226,29 +239,19 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int for _, r := range divided { r := r - go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) { + go func(collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) { errs := make([]IngestBulkError, 0) - newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password) for _, rec := range recs { - if newIngester == nil { - addBulkError(&errs, rec, ErrClosed) - continue - } - - err := newIngester.Pop(collection, bucket, rec.Object, rec.Text) + err := i.Pop(collection, bucket, rec.Object, rec.Text) if err != nil { addBulkError(&errs, rec, err) } } - if newIngester != nil { - _ = newIngester.Quit() - } - bulkErrorChan <- errs - }(i.driver, collection, bucket, r, bulkErrorChan) + }(collection, bucket, r, bulkErrorChan) } errs = make([]IngestBulkError, 0) @@ -260,13 +263,19 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int } func (i ingesterChannel) Count(collection, bucket, object string) (cnt int, err error) { - err = i.write(fmt.Sprintf("%s %s %s", count, collection, buildCountQuery(bucket, object))) + d, err := i.Get() + if err != nil { + return 0, err + } + defer d.close() + + err = d.write(fmt.Sprintf("%s %s %s", count, collection, buildCountQuery(bucket, object))) if err != nil { return 0, err } // RESULT NUMBER - r, err := i.read() + r, err := d.read() if err != nil { return 0, err } @@ -285,13 +294,19 @@ func buildCountQuery(bucket, object string) string { } func (i ingesterChannel) FlushCollection(collection string) (err error) { - err = i.write(fmt.Sprintf("%s %s", flushc, collection)) + d, err := i.Get() + if err != nil { + return err + } + defer d.close() + + err = d.write(fmt.Sprintf("%s %s", flushc, collection)) if err != nil { return err } // sonic should sent OK - _, err = i.read() + _, err = d.read() if err != nil { return err } @@ -299,13 +314,19 @@ func (i ingesterChannel) FlushCollection(collection string) (err error) { } func (i ingesterChannel) FlushBucket(collection, bucket string) (err error) { - err = i.write(fmt.Sprintf("%s %s %s", flushb, collection, bucket)) + d, err := i.Get() + if err != nil { + return err + } + defer d.close() + + err = d.write(fmt.Sprintf("%s %s %s", flushb, collection, bucket)) if err != nil { return err } // sonic should sent OK - _, err = i.read() + _, err = d.read() if err != nil { return err } @@ -313,13 +334,19 @@ func (i ingesterChannel) FlushBucket(collection, bucket string) (err error) { } func (i ingesterChannel) FlushObject(collection, bucket, object string) (err error) { - err = i.write(fmt.Sprintf("%s %s %s %s", flusho, collection, bucket, object)) + d, err := i.Get() + if err != nil { + return err + } + defer d.close() + + err = d.write(fmt.Sprintf("%s %s %s %s", flusho, collection, bucket, object)) if err != nil { return err } // sonic should sent OK - _, err = i.read() + _, err = d.read() if err != nil { return err } diff --git a/sonic/ingester_test.go b/sonic/ingester_test.go index 15f1777..4b78a8f 100644 --- a/sonic/ingester_test.go +++ b/sonic/ingester_test.go @@ -1,21 +1,33 @@ -package sonic +package sonic_test import ( "math/rand" "runtime" + "strconv" "testing" "time" + + "github.com/expectedsh/go-sonic/sonic" ) -var records = make([]IngestBulkRecord, 0) -var ingester, err = NewIngester("localhost", 1491, "SecretPassword") +var records = make([]sonic.IngestBulkRecord, 0) -func BenchmarkIngesterChannel_BulkPush2XMaxCPUs(b *testing.B) { +func getIngester(tb testing.TB) sonic.Ingestable { + tb.Helper() + + host, port, pass := getSonicConfig(tb) + + ing, err := sonic.NewIngester(host, port, pass) if err != nil { - return + tb.Fatal(err) } + return ing +} + +func BenchmarkIngesterChannel_BulkPush2XMaxCPUs(b *testing.B) { cpus := 2 * runtime.NumCPU() + ingester := getIngester(b) for n := 0; n < b.N; n++ { e := ingester.FlushBucket("test", "test2XMaxCpus") @@ -23,7 +35,7 @@ func BenchmarkIngesterChannel_BulkPush2XMaxCPUs(b *testing.B) { b.Log(e) b.Fail() } - be := ingester.BulkPush("test", "test2XMaxCpus", cpus, records, LangAutoDetect) + be := ingester.BulkPush("test", "test2XMaxCpus", cpus, records, sonic.LangAutoDetect) if len(be) > 0 { b.Log(be, e) b.Fail() @@ -32,9 +44,7 @@ func BenchmarkIngesterChannel_BulkPush2XMaxCPUs(b *testing.B) { } func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { - if err != nil { - return - } + ingester := getIngester(b) cpus := runtime.NumCPU() @@ -44,7 +54,7 @@ func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { b.Log(e) b.Fail() } - be := ingester.BulkPush("test", "testMaxCpus", cpus, records, LangAutoDetect) + be := ingester.BulkPush("test", "testMaxCpus", cpus, records, sonic.LangAutoDetect) if len(be) > 0 { b.Log(be, e) b.Fail() @@ -53,9 +63,7 @@ func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { } func BenchmarkIngesterChannel_BulkPush10(b *testing.B) { - if err != nil { - return - } + ingester := getIngester(b) for n := 0; n < b.N; n++ { e := ingester.FlushBucket("test", "test10") @@ -63,18 +71,15 @@ func BenchmarkIngesterChannel_BulkPush10(b *testing.B) { b.Log(e) b.Fail() } - be := ingester.BulkPush("test", "test10", 10, records, LangAutoDetect) + be := ingester.BulkPush("test", "test10", 10, records, sonic.LangAutoDetect) if len(be) > 0 { - b.Log(be, err) b.Fail() } } } func BenchmarkIngesterChannel_BulkPop10(b *testing.B) { - if err != nil { - return - } + ingester := getIngester(b) for n := 0; n < b.N; n++ { e := ingester.FlushBucket("test", "popTest10") @@ -84,16 +89,13 @@ func BenchmarkIngesterChannel_BulkPop10(b *testing.B) { } be := ingester.BulkPop("test", "popTest10", 10, records) if len(be) > 0 { - b.Log(be, err) b.Fail() } } } func BenchmarkIngesterChannel_Push(b *testing.B) { - if err != nil { - return - } + ingester := getIngester(b) for n := 0; n < b.N; n++ { e := ingester.FlushBucket("test", "testBulk") @@ -102,7 +104,7 @@ func BenchmarkIngesterChannel_Push(b *testing.B) { b.Fail() } for _, v := range records { - e := ingester.Push("test", "testBulk", v.Object, v.Text, LangAutoDetect) + e := ingester.Push("test", "testBulk", v.Object, v.Text, sonic.LangAutoDetect) if e != nil { b.Log(e) b.Fail() @@ -125,6 +127,82 @@ func randStr(length int, charset string) string { func init() { for n := 0; n < 3000; n++ { - records = append(records, IngestBulkRecord{randStr(10, charset), randStr(10, charset)}) + records = append(records, sonic.IngestBulkRecord{randStr(10, charset), randStr(10, charset)}) + } +} + +func TestIngester_Push_Count(t *testing.T) { + t.Parallel() + + host, port, pass := getSonicConfig(t) + + col := t.Name() + bucket := strconv.FormatInt(time.Now().UnixNano(), 10) + + ing, err := sonic.NewIngester(host, port, pass) + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = ing.FlushBucket(col, bucket) + _ = ing.Quit() + }) + + err = ing.Push(col, bucket, "obj", "test", sonic.LangAutoDetect) + if err != nil { + t.Fatal("Push", err) + } + + count, err := ing.Count(col, bucket, "obj") + switch { + case err != nil: + t.Fatal("Count", err) + case count != 1: + t.Fatalf("Actual: %d, expected: %d", count, 1) + } +} + +func TestIngester_BulkPush_Count(t *testing.T) { + t.Parallel() + + host, port, pass := getSonicConfig(t) + + col := t.Name() + bucket := strconv.FormatInt(time.Now().UnixNano(), 10) + + ing, err := sonic.NewIngester(host, port, pass) + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = ing.FlushBucket(col, bucket) + _ = ing.Quit() + }) + + errs := ing.BulkPush(col, bucket, 4, []sonic.IngestBulkRecord{{ + Object: "obj1", + Text: "test", + }, { + Object: "obj2", + Text: "test", + }, { + Object: "obj3", + Text: "test", + }, { + Object: "obj4", + Text: "test", + }}, sonic.LangAutoDetect) + if len(errs) != 0 { + t.Fatal("BlukPush", errs) + } + + count, err := ing.Count(col, bucket, "obj4") + switch { + case err != nil: + t.Fatal("Count", err) + case count != 1: + t.Fatalf("Actual: %d, expected: %d", count, 1) } } diff --git a/sonic/options.go b/sonic/options.go new file mode 100644 index 0000000..bdb392d --- /dev/null +++ b/sonic/options.go @@ -0,0 +1,79 @@ +package sonic + +import "time" + +type controllerOptions struct { + Host string + Port int + Password string + PoolMinConnections int + PoolMaxConnections int + PoolPingThreshold time.Duration + PoolMaxIdleLifetime time.Duration + Channel Channel +} + +func (o controllerOptions) With(optionSetters ...OptionSetter) controllerOptions { + for _, os := range optionSetters { + os(&o) + } + + return o +} + +func defaultOptions( + host string, + port int, + password string, + channel Channel, +) controllerOptions { + return controllerOptions{ + Host: host, + Port: port, + Password: password, + Channel: channel, + + PoolMinConnections: 1, + PoolMaxConnections: 16, + PoolMaxIdleLifetime: 5 * time.Minute, + PoolPingThreshold: 0, + } +} + +// OptionSetter defines an option setter function. +type OptionSetter func(*controllerOptions) + +// OptionPoolMaxConnections sets maximum number of idle connections in the pool. +// By default is 16. +func OptionPoolMaxIdleConnections(val int) OptionSetter { + return func(o *controllerOptions) { + o.PoolMaxConnections = val + } +} + +// OptionPoolMinIdleConnections sets minimum number of idle connections in the pool. +// By default is 1. +func OptionPoolMinIdleConnections(val int) OptionSetter { + return func(o *controllerOptions) { + o.PoolMinConnections = val + } +} + +// OptionPoolPingThreshold sets a minimum ping interval to ensure that +// the connection is healthy before getting it from the pool. +// +// By default is 0s. For disabling set it to 0. +func OptionPoolPingThreshold(val time.Duration) OptionSetter { + return func(o *controllerOptions) { + o.PoolPingThreshold = val + } +} + +// OptionPoolMaxIdleLifetime sets a minimum lifetime of idle connection. +// +// By default is 5m. For disabling set it to 0. +func OptionPoolMaxIdleLifetime(val time.Duration) OptionSetter { + return func(o *controllerOptions) { + o.PoolMaxIdleLifetime = val + } +} diff --git a/sonic/pool.go b/sonic/pool.go new file mode 100644 index 0000000..0991407 --- /dev/null +++ b/sonic/pool.go @@ -0,0 +1,162 @@ +package sonic + +import ( + "sync" + "time" +) + +const recursionLimit = 8 + +type driversPool struct { + driverFactory *driverFactory + drivers chan *driverWrapper + pingThreshold time.Duration + maxIdleLifetime time.Duration + + isPoolClosedMu sync.RWMutex + isPoolClosed bool +} + +func newDriversPool( + df *driverFactory, + minIdle int, + maxIdle int, + pingThreshold time.Duration, + maxIdleLifetime time.Duration, +) (*driversPool, error) { + dp := &driversPool{ + driverFactory: df, + drivers: make(chan *driverWrapper, maxIdle), + + pingThreshold: pingThreshold, + maxIdleLifetime: maxIdleLifetime, + + isPoolClosedMu: sync.RWMutex{}, + isPoolClosed: false, + } + + var err error + var dw *driverWrapper + + // Open connnections. + drivers := make([]*driverWrapper, 0, minIdle) + for i := 0; i < maxIdle; i++ { + dw, err = dp.Get() + if err != nil { + // We still need to close already opened connections. + break + } + + drivers = append(drivers, dw) + } + + // Return all connections to the pool. + for _, d := range drivers { + d.close() + } + + return dp, err +} + +// put the connection back. +func (p *driversPool) put(dw *driverWrapper) { + if dw.driver.closed { + return + } + + p.isPoolClosedMu.RLock() + defer p.isPoolClosedMu.RUnlock() + + if p.isPoolClosed { + dw.driver.close() + + return + } + + select { + case p.drivers <- dw: + default: + // The pool is full. + _ = dw.driver.Quit() + dw.driver.close() + } +} + +// Get a healthy driver from the pool. It pings the connection +// if it was configured by OptionPoolPingThreshold. +// It will open a connection if no connection is available in the pool. +// Closing of connection will return it back. +func (p *driversPool) Get() (*driverWrapper, error) { + p.isPoolClosedMu.RLock() + defer p.isPoolClosedMu.RUnlock() + + if p.isPoolClosed { + return nil, ErrClosed + } + + return p.getNextDriver(0) +} + +func (p *driversPool) getNextDriver(depth int) (*driverWrapper, error) { + if depth > recursionLimit { + return p.newDriver() + } + + select { + case d := <-p.drivers: + if !d.checkConn(p.pingThreshold, p.maxIdleLifetime) { + d.driver.close() + + return p.getNextDriver(depth + 1) + } + + return d, nil + default: + return p.newDriver() + } +} + +func (p *driversPool) newDriver() (*driverWrapper, error) { + d := p.driverFactory.Build() + + if err := d.Connect(); err != nil { + return nil, err + } + + return p.wrapDriver(d), nil +} + +// Close and quit all connections in the pool. +func (p *driversPool) Close() { + p.isPoolClosedMu.Lock() + defer p.isPoolClosedMu.Unlock() + p.isPoolClosed = true + + close(p.drivers) + for dw := range p.drivers { + if !dw.driver.closed { + _ = dw.driver.Quit() + + dw.driver.close() + } + } +} + +// wrapDriver overrides driver's connection close method. +func (p *driversPool) wrapDriver(d *driver) *driverWrapper { + return &driverWrapper{ + driver: d, + onClose: p.put, + } +} + +// driverWrapper helps to override close for *driver.connection. +type driverWrapper struct { + onClose func(*driverWrapper) + *driver +} + +// close overrides close method of the driver. +func (dw *driverWrapper) close() { + dw.onClose(dw) +} diff --git a/sonic/pool_test.go b/sonic/pool_test.go new file mode 100644 index 0000000..9a48cf7 --- /dev/null +++ b/sonic/pool_test.go @@ -0,0 +1,246 @@ +package sonic_test + +import ( + "errors" + "fmt" + "net" + "testing" + "time" + + "github.com/expectedsh/go-sonic/sonic" +) + +func TestPool_Reconnect(t *testing.T) { + host, port, pass := getSonicConfig(t) + + proxyLn, proxyDoneCh := runTCPProxy(t, + fmt.Sprintf("%s:%d", host, port), // Target addr. + "127.0.0.1:0", // Proxy addr. + ) + + proxyHost, proxyPort := mustSplitHostPort(t, proxyLn.Addr().String()) + + ing, err := sonic.NewIngester( + proxyHost, + proxyPort, + pass, + sonic.OptionPoolPingThreshold(time.Nanosecond), + ) + if err != nil { + t.Fatal("NewIngester", err) + } + + // Connection healthy, ping should work. + + err = ing.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + err = ing.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + // Close connection, ping should not work. + + err = proxyLn.Close() + if err != nil { + t.Fatal("Close", err) + } + + select { + case <-proxyDoneCh: + case <-time.After(2 * time.Second): + t.Fatal("Timeout") + } + + err = ing.Ping() + if err == nil { + t.Fatal("Ping", err) + } + + // Reconnect, ping should work. + + proxyLn, proxyDoneCh = runTCPProxy(t, + fmt.Sprintf("%s:%d", host, port), // Target addr. + fmt.Sprintf("%s:%d", proxyHost, proxyPort), // Proxy addr. + ) + + err = ing.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + err = proxyLn.Close() + if err != nil { + t.Fatal("Close", err) + } + + select { + case <-proxyDoneCh: + case <-time.After(2 * time.Second): + t.Fatal("Timeout") + } +} + +func TestPool_Reconnect_Threshold(t *testing.T) { + host, port, pass := getSonicConfig(t) + + proxyLn, proxyDoneCh := runTCPProxy(t, + fmt.Sprintf("%s:%d", host, port), // Target addr. + "127.0.0.1:0", // Proxy addr. + ) + + proxyHost, proxyPort := mustSplitHostPort(t, proxyLn.Addr().String()) + + ing, err := sonic.NewIngester( + proxyHost, + proxyPort, + pass, + sonic.OptionPoolPingThreshold(time.Minute), + ) + if err != nil { + t.Fatal("NewIngester", err) + } + + // Connection healthy, ping should work. + + err = ing.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + err = ing.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + // Close connection, ping should not work. + + err = proxyLn.Close() + if err != nil { + t.Fatal("Close", err) + } + + select { + case <-proxyDoneCh: + case <-time.After(2 * time.Second): + t.Fatal("Timeout") + } + + err = ing.Ping() + if err == nil { + t.Fatal("Ping", err) + } + + // Reconnect in threshold, ping still should not work. + + proxyLn, proxyDoneCh = runTCPProxy(t, + fmt.Sprintf("%s:%d", host, port), // Target addr. + fmt.Sprintf("%s:%d", proxyHost, proxyPort), // Proxy addr. + ) + + err = ing.Ping() + if err == nil { + t.Fatal("Ping", err) + } + + err = proxyLn.Close() + if err != nil { + t.Fatal("Close", err) + } + + select { + case <-proxyDoneCh: + case <-time.After(2 * time.Second): + t.Fatal("Timeout") + } +} + +func runTCPProxy( + tb testing.TB, + targetAddr string, + proxyAddr string, +) (l net.Listener, doneCh <-chan struct{}) { + tb.Helper() + + l, err := net.Listen("tcp", proxyAddr) + if err != nil { + tb.Fatal("Listen", err) + } + + tb.Cleanup(func() { _ = l.Close() }) + + tb.Log("Proxy:", l.Addr().String(), "To:", l.Addr().String()) + + closeCh := make(chan struct{}) + go func() { + defer close(closeCh) + + for { + inConn, err := l.Accept() + if err != nil { + if !errors.Is(err, net.ErrClosed) { + tb.Log("Accept", err) + } + + return + } + + outConn, err := net.Dial("tcp", targetAddr) + if err != nil { + if !errors.Is(err, net.ErrClosed) { + tb.Error("Dial", err) + } + + return + } + + defer func() { _ = inConn.Close() }() + defer func() { _ = outConn.Close() }() + + go runTCPPipe(tb, inConn, outConn) + go runTCPPipe(tb, outConn, inConn) + } + }() + + return l, closeCh +} + +func runTCPPipe(tb testing.TB, inConn, outConn net.Conn) { + connID := time.Now().UnixNano() + tb.Log("Acepted new conn", connID) + + buff := make([]byte, 0xFF) + for { + _ = inConn.SetReadDeadline(time.Now().Add(5 * time.Second)) + + n, err := inConn.Read(buff) + if err != nil { + if !errors.Is(err, net.ErrClosed) { + tb.Error("inConn.ReadAll", connID, err) + } + + return + } + + _ = inConn.SetReadDeadline(time.Time{}) + _ = outConn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + + tb.Log("read", n, "bytes") + + n, err = outConn.Write(buff[:n]) + if err != nil { + if !errors.Is(err, net.ErrClosed) { + tb.Error("outConn.Write", connID, err) + } + + return + } + + tb.Log("wrote", n, "bytes") + + _ = outConn.SetWriteDeadline(time.Time{}) + } +} diff --git a/sonic/search.go b/sonic/search.go index c067232..116b9ab 100644 --- a/sonic/search.go +++ b/sonic/search.go @@ -32,41 +32,52 @@ const ( ) type searchChannel struct { - *driver + *driversHolder } // NewSearch create a new driver instance with a searchChannel instance. // Only way to get a Searchable implementation. -func NewSearch(host string, port int, password string) (Searchable, error) { - driver := &driver{ - Host: host, - Port: port, - Password: password, - channel: Search, - } - err := driver.Connect() +func NewSearch( + host string, + port int, + password string, + opts ...OptionSetter, +) (Searchable, error) { + driversHolder, err := newDriversHolder(defaultOptions( + host, + port, + password, + Search, + ).With(opts...)) if err != nil { return nil, err } + return searchChannel{ - driver: driver, + driversHolder: driversHolder, }, nil } func (s searchChannel) Query(collection, bucket, term string, limit, offset int, lang Lang) (results []string, err error) { - err = s.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d) OFFSET(%d)"+langFormat(lang), query, collection, bucket, term, limit, offset, lang)) + d, err := s.Get() + if err != nil { + return nil, err + } + defer d.close() + + err = d.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d) OFFSET(%d)"+langFormat(lang), query, collection, bucket, term, limit, offset, lang)) if err != nil { return nil, err } // pending, should be PENDING ID_EVENT - _, err = s.read() + _, err = d.read() if err != nil { return nil, err } // event query, should be EVENT QUERY ID_EVENT RESULT1 RESULT2 ... - read, err := s.read() + read, err := d.read() if err != nil { return nil, err } @@ -74,19 +85,25 @@ func (s searchChannel) Query(collection, bucket, term string, limit, offset int, } func (s searchChannel) Suggest(collection, bucket, word string, limit int) (results []string, err error) { - err = s.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d)", suggest, collection, bucket, word, limit)) + d, err := s.Get() + if err != nil { + return nil, err + } + defer d.close() + + err = d.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d)", suggest, collection, bucket, word, limit)) if err != nil { return nil, err } // pending, should be PENDING ID_EVENT - _, err = s.read() + _, err = d.read() if err != nil { return nil, err } // event query, should be EVENT SUGGEST ID_EVENT RESULT1 RESULT2 ... - read, err := s.read() + read, err := d.read() if err != nil { return nil, err } diff --git a/sonic/search_test.go b/sonic/search_test.go new file mode 100644 index 0000000..3e27a03 --- /dev/null +++ b/sonic/search_test.go @@ -0,0 +1,96 @@ +package sonic_test + +import ( + "strconv" + "testing" + "time" + + "github.com/expectedsh/go-sonic/sonic" +) + +func getSearch(tb testing.TB) sonic.Searchable { + tb.Helper() + + host, port, pass := getSonicConfig(tb) + + srch, err := sonic.NewSearch(host, port, pass) + if err != nil { + tb.Fatal(err) + } + + return srch +} + +func TestSearch(t *testing.T) { + t.Parallel() + + col := t.Name() + bucket := strconv.FormatInt(time.Now().UnixNano(), 10) + + ing := getIngester(t) + srch := getSearch(t) + + t.Cleanup(func() { + _ = ing.FlushBucket(col, bucket) + _ = ing.Quit() + _ = srch.Quit() + }) + + err := ing.Push(col, bucket, "obj1", "xxx", sonic.LangAutoDetect) + if err != nil { + t.Fatal("Push", err) + } + + err = ing.Push(col, bucket, "obj1", "yyy", sonic.LangAutoDetect) + if err != nil { + t.Fatal("Push", err) + } + + t.Run("Query_ok", func(t *testing.T) { + t.Parallel() + + res, err := srch.Query(col, bucket, "xxx", 1, 0, sonic.LangAutoDetect) + switch { + case err != nil: + t.Fatal("Query", err) + case len(res) != 1: + t.Fatalf("Actual: %d, expected: %d", len(res), 1) + } + }) + + t.Run("Query_empty", func(t *testing.T) { + t.Parallel() + + res, err := srch.Query(col, bucket, "zzz", 1, 0, sonic.LangAutoDetect) + switch { + case err != nil: + t.Fatal("Query", err) + case len(res) > 1 && res[0] != "": + t.Fatalf("Actual: %d, expected: %d (%+v)", len(res), 0, res) + } + }) + + t.Run("Query_outOfOffset", func(t *testing.T) { + t.Parallel() + + res, err := srch.Query(col, bucket, "xxx", 1, 1, sonic.LangAutoDetect) + switch { + case err != nil: + t.Fatal("Query", err) + case len(res) > 1 && res[0] != "": + t.Fatalf("Actual: %d, expected: %d (%+v)", len(res), 0, res) + } + }) + + t.Run("Suggest", func(t *testing.T) { + t.Parallel() + + res, err := srch.Suggest(col, bucket, "xx", 1) + switch { + case err != nil: + t.Fatal("Query", err) + case len(res) != 1: + t.Fatalf("Actual: %d, expected: %d (%+v)", len(res), 0, res) + } + }) +} diff --git a/sonic/sonic_test.go b/sonic/sonic_test.go new file mode 100644 index 0000000..2472719 --- /dev/null +++ b/sonic/sonic_test.go @@ -0,0 +1,50 @@ +package sonic_test + +import ( + "net" + "net/url" + "os" + "strconv" + "testing" +) + +func getSonicConfig(tb testing.TB) (host string, portNum int, password string) { + tb.Helper() + + const envAddr = "TEST_SONIC_ADDR" + + sonicAddr := os.Getenv(envAddr) + if sonicAddr == "" { + tb.Fatal(envAddr + " is not set") + } + + u, err := url.Parse(sonicAddr) + if err != nil { + tb.Fatalf("parsing url: %s", err) + } + + portNum, err = strconv.Atoi(u.Port()) + if err != nil { + tb.Fatalf("parsing port: %s", err) + } + + pass, _ := u.User.Password() + + return u.Hostname(), portNum, pass +} + +func mustSplitHostPort(tb testing.TB, hostport string) (host string, port int) { + tb.Helper() + + host, portStr, err := net.SplitHostPort(hostport) + if err != nil { + tb.Fatal("SplitHostPort", err) + } + + proxyPort, err := strconv.Atoi(portStr) + if err != nil { + tb.Fatal("Atoi", err) + } + + return host, proxyPort +} diff --git a/testdata/Dockerfile b/testdata/Dockerfile new file mode 100644 index 0000000..e1e5a04 --- /dev/null +++ b/testdata/Dockerfile @@ -0,0 +1,3 @@ +FROM valeriansaliou/sonic:v1.3.2 + +RUN apt update && apt install -y netcat diff --git a/testdata/sonic.cfg b/testdata/sonic.cfg new file mode 100644 index 0000000..bb2869d --- /dev/null +++ b/testdata/sonic.cfg @@ -0,0 +1,66 @@ +# Sonic +# Fast, lightweight and schema-less search backend +# Configuration file +# Example: https://github.com/valeriansaliou/sonic/blob/master/config.cfg + + +[server] + +log_level = "debug" + + +[channel] + +inet = "0.0.0.0:1491" +tcp_timeout = 300 + +auth_password = "SecretPassword" + +[channel.search] + +query_limit_default = 10 +query_limit_maximum = 100 +query_alternates_try = 4 + +suggest_limit_default = 5 +suggest_limit_maximum = 20 + + +[store] + +[store.kv] + +path = "./data/store/kv/" + +retain_word_objects = 1000 + +[store.kv.pool] + +inactive_after = 1800 + +[store.kv.database] + +flush_after = 900 + +compress = true +parallelism = 2 +max_files = 100 +max_compactions = 1 +max_flushes = 1 +write_buffer = 16384 +write_ahead_log = true + +[store.fst] + +path = "./data/store/fst/" + +[store.fst.pool] + +inactive_after = 300 + +[store.fst.graph] + +consolidate_after = 180 + +max_size = 2048 +max_words = 250000 \ No newline at end of file From 242d3f611caebd0ce4f1417a66b69efd692a438f Mon Sep 17 00:00:00 2001 From: Kryvchun Date: Tue, 7 Mar 2023 15:16:52 +0200 Subject: [PATCH 2/3] fix: create minIdle connections at start --- sonic/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sonic/pool.go b/sonic/pool.go index 0991407..4cd0b21 100644 --- a/sonic/pool.go +++ b/sonic/pool.go @@ -40,7 +40,7 @@ func newDriversPool( // Open connnections. drivers := make([]*driverWrapper, 0, minIdle) - for i := 0; i < maxIdle; i++ { + for i := 0; i < minIdle; i++ { dw, err = dp.Get() if err != nil { // We still need to close already opened connections. From b400eb6c145f0522c3810422efa30f5c539929d8 Mon Sep 17 00:00:00 2001 From: Kryvchun Date: Tue, 11 Apr 2023 08:43:29 +0300 Subject: [PATCH 3/3] feat: safely quote terms --- sonic/search.go | 11 ++++++++++- sonic/search_test.go | 45 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/sonic/search.go b/sonic/search.go index 116b9ab..2ef1fa5 100644 --- a/sonic/search.go +++ b/sonic/search.go @@ -65,7 +65,16 @@ func (s searchChannel) Query(collection, bucket, term string, limit, offset int, } defer d.close() - err = d.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d) OFFSET(%d)"+langFormat(lang), query, collection, bucket, term, limit, offset, lang)) + err = d.write(fmt.Sprintf( + "%s %s %s %q LIMIT(%d) OFFSET(%d)"+langFormat(lang), + query, + collection, + bucket, + term, + limit, + offset, + lang, + )) if err != nil { return nil, err } diff --git a/sonic/search_test.go b/sonic/search_test.go index 3e27a03..6535650 100644 --- a/sonic/search_test.go +++ b/sonic/search_test.go @@ -58,6 +58,51 @@ func TestSearch(t *testing.T) { } }) + t.Run("Query_quote", func(t *testing.T) { + t.Parallel() + + _, err := srch.Query(col, bucket, `'quote' "hello"`, 1, 0, sonic.LangAutoDetect) + if err != nil { + t.Fatal("Query", err) + } + }) + + t.Run("Query_escape", func(t *testing.T) { + t.Parallel() + + _, err := srch.Query(col, bucket, `escape symbol \`, 1, 0, sonic.LangAutoDetect) + if err != nil { + t.Fatal("Query", err) + } + }) + + t.Run("Query_tab", func(t *testing.T) { + t.Parallel() + + _, err := srch.Query(col, bucket, "\t", 1, 0, sonic.LangAutoDetect) + if err != nil { + t.Fatal("Query", err) + } + }) + + t.Run("Query_space", func(t *testing.T) { + t.Parallel() + + _, err := srch.Query(col, bucket, " ", 1, 0, sonic.LangAutoDetect) + if err == nil { + t.Fatal("Expected error, but got nil") + } + }) + + t.Run("Query_empty", func(t *testing.T) { + t.Parallel() + + _, err := srch.Query(col, bucket, "", 1, 0, sonic.LangAutoDetect) + if err == nil { + t.Fatal("Expected error, but got nil") + } + }) + t.Run("Query_empty", func(t *testing.T) { t.Parallel()