Skip to content

Commit

Permalink
feat: support connections pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Kryvchun committed Dec 26, 2022
1 parent d31eb03 commit 2367a15
Show file tree
Hide file tree
Showing 18 changed files with 1,104 additions and 141 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/*
.idea/**/*
.idea
.idea
.vscode
11 changes: 9 additions & 2 deletions cmd/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package main

import (
"fmt"
"time"

"github.com/expectedsh/go-sonic/sonic"
)

const pswd = "SecretPassword"

func main() {

ingester, err := sonic.NewIngester("localhost", 1491, pswd)
ingester, err := sonic.NewIngester(
"localhost",
1491,
pswd,
sonic.OptionPoolMaxIdleConnections(16),
sonic.OptionPoolMinIdleConnections(1),
sonic.OptionPoolPingThreshold(time.Minute),
)
if err != nil {
panic(err)
}
Expand Down
42 changes: 42 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
version: "3"

# This file is used for testing the library.
#
# Running tests in docker:
# docker-compose run --rm test
#
# Running tests locally:
# export TEST_SONIC_ADDR="sonic://:[email protected]:1491"
# docker-compose up -d sonic
# go test ./...
#
# Cleanup:
# docker-compose down

services:
sonic:
build:
context: ./testdata
volumes:
- ./testdata/sonic.cfg:/etc/sonic.cfg:ro
ports:
- "127.0.0.1:1491:1491"
environment:
RUST_BACKTRACE: "full"
healthcheck:
test: nc -z 127.0.0.1 1491
interval: 5s
timeout: 3s
retries: 7

test:
image: "golang:1.18.3"
depends_on:
sonic:
condition: service_healthy
volumes:
- .:/app:ro
environment:
- TEST_SONIC_ADDR=sonic://:SecretPassword@sonic:1491
command: bash -c "cd /app && go test ./..."
38 changes: 1 addition & 37 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

func main() {

ingester, err := sonic.NewIngester("localhost", 1491, "SecretPassword")
if err != nil {
panic(err)
Expand Down Expand Up @@ -67,39 +66,4 @@ Hardware detail: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz

### Thread Safety

The driver itself isn't thread safe. You could use locks or channels to avoid crashes.

```go
package main

import (
"fmt"

"github.com/expectedsh/go-sonic/sonic"
)

func main() {
events := make(chan []string, 1)

event := []string{"some_text", "some_id"}
tryCrash := func() {
for {
// replace "event" with whatever is giving you events: pubsub, amqp messages…
events <- event
}
}

go tryCrash()
go tryCrash()
go tryCrash()
go tryCrash()

ingester, _ := sonic.NewIngester("localhost", 1491, "SecretPassword")

for {
msg := <-events
// Or use some buffering along with BulkPush
ingester.Push("collection", "bucket", msg[1], msg[0])
}
}
```
The driver itself uses a connections pool, and it is thread-safe.
26 changes: 14 additions & 12 deletions sonic/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,40 @@ type Controllable interface {

// controlChannel is used for administration purposes.
type controlChannel struct {
*driver
*driversHolder
}

// NewControl create a new driver instance with a controlChannel instance.
// Only way to get a Controllable implementation.
func NewControl(host string, port int, password string) (Controllable, error) {
driver := &driver{
Host: host,
Port: port,
Password: password,
channel: Control,
}
err := driver.Connect()
func NewControl(host string, port int, password string, opts ...OptionSetter) (Controllable, error) {
driversHolder, err := newDriversHolder(defaultOptions(host, port, password, Control).With(opts...))
if err != nil {
return nil, err
}

return controlChannel{
driver: driver,
driversHolder: driversHolder,
}, nil
}

func (c controlChannel) Trigger(action Action) (err error) {
if !IsActionValid(action) {
return ErrActionName
}
err = c.write(fmt.Sprintf("TRIGGER %s", action))

d, err := c.Get()
if err != nil {
return err
}
defer d.close()

err = d.write(fmt.Sprintf("TRIGGER %s", action))
if err != nil {
return err
}

// should get OK
_, err = c.read()
_, err = d.read()
if err != nil {
return err
}
Expand Down
50 changes: 50 additions & 0 deletions sonic/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package sonic

// driversHolder defines base interface around driversPool.
type driversHolder struct {
*driversPool
}

func newDriversHolder(
opts controllerOptions,
) (*driversHolder, error) {
df := driverFactory{
Host: opts.Host,
Port: opts.Port,
Password: opts.Password,
Channel: opts.Channel,
}

dp, err := newDriversPool(
&df,
opts.PoolMinConnections,
opts.PoolMaxConnections,
opts.PoolPingThreshold,
opts.PoolMaxIdleLifetime,
)
if err != nil {
return nil, err
}

return &driversHolder{
driversPool: dp,
}, nil
}

// Quit all connections and close the pool. It never returns an error.
func (c *driversHolder) Quit() error {
c.driversPool.Close()

return nil
}

// Ping one connection.
func (c *driversHolder) Ping() error {
d, err := c.Get()
if err != nil {
return err
}
defer d.close()

return d.Ping()
}
28 changes: 28 additions & 0 deletions sonic/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package sonic_test

import (
"testing"

"github.com/expectedsh/go-sonic/sonic"
)

func TestController(t *testing.T) {
t.Parallel()

var ctrl sonic.Base = getIngester(t)

err := ctrl.Ping()
if err != nil {
t.Fatal("Ping", err)
}

err = ctrl.Quit()
if err != nil {
t.Fatal("Quit", err)
}

err = ctrl.Ping()
if err == nil {
t.Fatal("Ping", err)
}
}
47 changes: 46 additions & 1 deletion sonic/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sonic

import (
"errors"
"time"
)

var (
Expand Down Expand Up @@ -32,18 +33,44 @@ type driver struct {
Port int
Password string

lastUse time.Time
lastPing time.Time

channel Channel
*connection
}

type driverFactory struct {
Host string
Port int
Password string
Channel Channel
}

func (df driverFactory) Build() *driver {
return &driver{
Host: df.Host,
Port: df.Port,
Password: df.Password,
channel: df.Channel,

lastPing: time.Time{},
connection: nil,
}
}

// Connect open a connection via TCP with the sonic server.
func (c *driver) Connect() error {
if !IsChannelValid(c.channel) {
return ErrChanName
}

var err error

c.connection, err = newConnection(c)
c.lastPing = time.Now()
c.lastUse = time.Now()

return err
}

Expand All @@ -60,7 +87,7 @@ func (c *driver) Quit() error {
return err
}

func (c driver) Ping() error {
func (c *driver) Ping() error {
err := c.write("PING")
if err != nil {
return err
Expand All @@ -71,5 +98,23 @@ func (c driver) Ping() error {
if err != nil {
return err
}

c.lastPing = time.Now()

return nil
}

// softPing pings the connection if it wasn't pinged for a while.
func (c *driver) checkConn(pingThreshold, maxLifetime time.Duration) (ok bool) {
if maxLifetime > 0 && time.Since(c.lastUse) > maxLifetime {
return false
}

c.lastUse = time.Now()

if pingThreshold > 0 && time.Since(c.lastPing) > pingThreshold {
return c.Ping() == nil
}

return true
}
Loading

0 comments on commit 2367a15

Please sign in to comment.