Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement connections pool #23

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/*
.idea/**/*
.idea
.idea
.vscode
11 changes: 9 additions & 2 deletions cmd/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package main

import (
"fmt"
"time"

"github.com/expectedsh/go-sonic/sonic"
)

const pswd = "SecretPassword"

func main() {

ingester, err := sonic.NewIngester("localhost", 1491, pswd)
ingester, err := sonic.NewIngester(
"localhost",
1491,
pswd,
sonic.OptionPoolMaxIdleConnections(16),
sonic.OptionPoolMinIdleConnections(1),
sonic.OptionPoolPingThreshold(time.Minute),
)
if err != nil {
panic(err)
}
Expand Down
42 changes: 42 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
version: "3"

# This file is used for testing the library.
#
# Running tests in docker:
# docker-compose run --rm test
#
# Running tests locally:
# export TEST_SONIC_ADDR="sonic://:[email protected]:1491"
# docker-compose up -d sonic
# go test ./...
#
# Cleanup:
# docker-compose down

services:
sonic:
build:
context: ./testdata
volumes:
- ./testdata/sonic.cfg:/etc/sonic.cfg:ro
ports:
- "127.0.0.1:1491:1491"
environment:
RUST_BACKTRACE: "full"
healthcheck:
test: nc -z 127.0.0.1 1491
interval: 5s
timeout: 3s
retries: 7

test:
image: "golang:1.18.3"
depends_on:
sonic:
condition: service_healthy
volumes:
- .:/app:ro
environment:
- TEST_SONIC_ADDR=sonic://:SecretPassword@sonic:1491
command: bash -c "cd /app && go test ./..."
38 changes: 1 addition & 37 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

func main() {

ingester, err := sonic.NewIngester("localhost", 1491, "SecretPassword")
if err != nil {
panic(err)
Expand Down Expand Up @@ -67,39 +66,4 @@ Hardware detail: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz

### Thread Safety

The driver itself isn't thread safe. You could use locks or channels to avoid crashes.

```go
package main

import (
"fmt"

"github.com/expectedsh/go-sonic/sonic"
)

func main() {
events := make(chan []string, 1)

event := []string{"some_text", "some_id"}
tryCrash := func() {
for {
// replace "event" with whatever is giving you events: pubsub, amqp messages…
events <- event
}
}

go tryCrash()
go tryCrash()
go tryCrash()
go tryCrash()

ingester, _ := sonic.NewIngester("localhost", 1491, "SecretPassword")

for {
msg := <-events
// Or use some buffering along with BulkPush
ingester.Push("collection", "bucket", msg[1], msg[0])
}
}
```
The driver itself uses a connections pool, and it is thread-safe.
26 changes: 14 additions & 12 deletions sonic/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,40 @@ type Controllable interface {

// controlChannel is used for administration purposes.
type controlChannel struct {
*driver
*driversHolder
}

// NewControl create a new driver instance with a controlChannel instance.
// Only way to get a Controllable implementation.
func NewControl(host string, port int, password string) (Controllable, error) {
driver := &driver{
Host: host,
Port: port,
Password: password,
channel: Control,
}
err := driver.Connect()
func NewControl(host string, port int, password string, opts ...OptionSetter) (Controllable, error) {
driversHolder, err := newDriversHolder(defaultOptions(host, port, password, Control).With(opts...))
if err != nil {
return nil, err
}

return controlChannel{
driver: driver,
driversHolder: driversHolder,
}, nil
}

func (c controlChannel) Trigger(action Action) (err error) {
if !IsActionValid(action) {
return ErrActionName
}
err = c.write(fmt.Sprintf("TRIGGER %s", action))

d, err := c.Get()
if err != nil {
return err
}
defer d.close()

err = d.write(fmt.Sprintf("TRIGGER %s", action))
if err != nil {
return err
}

// should get OK
_, err = c.read()
_, err = d.read()
if err != nil {
return err
}
Expand Down
50 changes: 50 additions & 0 deletions sonic/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package sonic

// driversHolder defines base interface around driversPool.
type driversHolder struct {
*driversPool
}

func newDriversHolder(
opts controllerOptions,
) (*driversHolder, error) {
df := driverFactory{
Host: opts.Host,
Port: opts.Port,
Password: opts.Password,
Channel: opts.Channel,
}

dp, err := newDriversPool(
&df,
opts.PoolMinConnections,
opts.PoolMaxConnections,
opts.PoolPingThreshold,
opts.PoolMaxIdleLifetime,
)
if err != nil {
return nil, err
}

return &driversHolder{
driversPool: dp,
}, nil
}

// Quit all connections and close the pool. It never returns an error.
func (c *driversHolder) Quit() error {
c.driversPool.Close()

return nil
}

// Ping one connection.
func (c *driversHolder) Ping() error {
d, err := c.Get()
if err != nil {
return err
}
defer d.close()

return d.Ping()
}
28 changes: 28 additions & 0 deletions sonic/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package sonic_test

import (
"testing"

"github.com/expectedsh/go-sonic/sonic"
)

func TestController(t *testing.T) {
t.Parallel()

var ctrl sonic.Base = getIngester(t)

err := ctrl.Ping()
if err != nil {
t.Fatal("Ping", err)
}

err = ctrl.Quit()
if err != nil {
t.Fatal("Quit", err)
}

err = ctrl.Ping()
if err == nil {
t.Fatal("Ping", err)
}
}
47 changes: 46 additions & 1 deletion sonic/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sonic

import (
"errors"
"time"
)

var (
Expand Down Expand Up @@ -32,18 +33,44 @@ type driver struct {
Port int
Password string

lastUse time.Time
lastPing time.Time

channel Channel
*connection
}

type driverFactory struct {
Host string
Port int
Password string
Channel Channel
}

func (df driverFactory) Build() *driver {
return &driver{
Host: df.Host,
Port: df.Port,
Password: df.Password,
channel: df.Channel,

lastPing: time.Time{},
connection: nil,
}
}

// Connect open a connection via TCP with the sonic server.
func (c *driver) Connect() error {
if !IsChannelValid(c.channel) {
return ErrChanName
}

var err error

c.connection, err = newConnection(c)
c.lastPing = time.Now()
c.lastUse = time.Now()

return err
}

Expand All @@ -60,7 +87,7 @@ func (c *driver) Quit() error {
return err
}

func (c driver) Ping() error {
func (c *driver) Ping() error {
err := c.write("PING")
if err != nil {
return err
Expand All @@ -71,5 +98,23 @@ func (c driver) Ping() error {
if err != nil {
return err
}

c.lastPing = time.Now()

return nil
}

// softPing pings the connection if it wasn't pinged for a while.
func (c *driver) checkConn(pingThreshold, maxLifetime time.Duration) (ok bool) {
if maxLifetime > 0 && time.Since(c.lastUse) > maxLifetime {
return false
}

c.lastUse = time.Now()

if pingThreshold > 0 && time.Since(c.lastPing) > pingThreshold {
return c.Ping() == nil
}

return true
}
Loading