Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry to get session when session is invalid in SessionPool #252

Merged
merged 5 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
**IMPORTANT: Code of Nebula go client has been transferred from [nebula-clients](https://github.com/vesoft-inc/nebula-clients) to this repository(nebula-go), and new releases in the future will be published in this repository.
Please update your go.mod and imports correspondingly.**

Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/facebook/fbthrift/). Currently the latest stable release is **[v3.3.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.3)**
Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/facebook/fbthrift/). Currently the latest stable release is **[v3.4.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.4)**

The code in **master branch** will be updated to accommodate the nightly changes made in NebulaGraph.
To Use the console with a stable release of NebulaGraph, please check the branches and use the corresponding version.
Expand All @@ -22,6 +22,7 @@ To Use the console with a stable release of NebulaGraph, please check the branch
| **[v3.1.x](https://github.com/vesoft-inc/nebula-go/tree/v3.1.0)** | 3.1.x |
| **[v3.2.x](https://github.com/vesoft-inc/nebula-go/tree/v3.2.0)** | 3.1.x-3.2.x |
| **[v3.3.x](https://github.com/vesoft-inc/nebula-go/tree/v3.3.0)** | 3.1.x-3.3.x |
| **[v3.4.x](https://github.com/vesoft-inc/nebula-go/tree/v3.4.0)** | 3.1.x-3.4.x |
| **[master](https://github.com/vesoft-inc/nebula-go/tree/master)** | 3.x-nightly |

Please be careful not to modify the files in the nebula directory, these codes were all generated by fbthrift.
Expand All @@ -37,7 +38,7 @@ $ go get -u -v github.com/vesoft-inc/nebula-go/v3@master
You can specify the version of Nebula-go by substituting `<tag>` in `$ go get -u -v github.com/vesoft-inc/nebula-go@<tag>`.
For example:

for v3: `$ go get -u -v github.com/vesoft-inc/nebula-go/v3@v3.3.0`
for v3: `$ go get -u -v github.com/vesoft-inc/nebula-go/v3@v3.4.0`

for v2: `$ go get -u -v github.com/vesoft-inc/nebula-go/[email protected]`

Expand Down
20 changes: 6 additions & 14 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,15 +1202,11 @@ func TestReconnect(t *testing.T) {
defer pool.Close()

// Create session
var sessionList []*Session

for i := 0; i < 3; i++ {
session, err := pool.GetSession(username, password)
if err != nil {
t.Errorf("fail to create a new session from connection pool, %s", err.Error())
}
sessionList = append(sessionList, session)
session, err := pool.GetSession(username, password)
if err != nil {
t.Errorf("fail to create a new session from connection pool, %s", err.Error())
}
defer session.Release()

// Send query to server periodically
for i := 0; i < timeoutConfig.MaxConnPoolSize; i++ {
Expand All @@ -1221,7 +1217,7 @@ func TestReconnect(t *testing.T) {
if i == 7 {
stopContainer(t, "nebula-docker-compose_graphd1_1")
}
_, err := sessionList[0].Execute("SHOW HOSTS;")
_, err := session.Execute("SHOW HOSTS;")
fmt.Println("Sending query...")

if err != nil {
Expand All @@ -1230,7 +1226,7 @@ func TestReconnect(t *testing.T) {
}
}

resp, err := sessionList[0].Execute("SHOW HOSTS;")
resp, err := session.Execute("SHOW HOSTS;")
if err != nil {
t.Fatalf(err.Error())
return
Expand All @@ -1240,10 +1236,6 @@ func TestReconnect(t *testing.T) {
startContainer(t, "nebula-docker-compose_graphd_1")
startContainer(t, "nebula-docker-compose_graphd1_1")

for i := 0; i < len(sessionList); i++ {
sessionList[i].Release()
}

// Wait for graphd to be up
time.Sleep(5 * time.Second)
}
Expand Down
32 changes: 17 additions & 15 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ func openAndReadFile(path string) ([]byte, error) {
// SessionPoolConf is the configs of a session pool
// Note that the space name is bound to the session pool for its lifetime
type SessionPoolConf struct {
username string // username for authentication
password string // password for authentication
serviceAddrs []HostAddress // service addresses for session pool
hostIndex int // index of the host in ServiceAddrs that the next new session will connect to
spaceName string // The space name that all sessions in the pool are bound to
sslConfig *tls.Config // Optional SSL config for the connection
username string // username for authentication
password string // password for authentication
serviceAddrs []HostAddress // service addresses for session pool
hostIndex int // index of the host in ServiceAddrs that the next new session will connect to
spaceName string // The space name that all sessions in the pool are bound to
sslConfig *tls.Config // Optional SSL config for the connection
retryGetSessionTimes int // The max times to retry get new session when executing a query

// Basic pool configs
// Socket timeout and Socket connection timeout, unit: seconds
Expand All @@ -143,15 +144,16 @@ func NewSessionPoolConf(
spaceName string, opts ...SessionPoolConfOption) (*SessionPoolConf, error) {
// Set default values for basic pool configs
newPoolConf := SessionPoolConf{
username: username,
password: password,
serviceAddrs: serviceAddrs,
spaceName: spaceName,
timeOut: 0 * time.Millisecond,
idleTime: 0 * time.Millisecond,
maxSize: 30,
minSize: 1,
hostIndex: 0,
username: username,
password: password,
serviceAddrs: serviceAddrs,
spaceName: spaceName,
retryGetSessionTimes: 1,
timeOut: 0 * time.Millisecond,
idleTime: 0 * time.Millisecond,
maxSize: 30,
minSize: 1,
hostIndex: 0,
}

// Iterate the given options and apply them to the config.
Expand Down
4 changes: 2 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ func (session *Session) ExecuteJsonWithParameter(stmt string, params map[string]
}

func (session *Session) reConnect() error {
newconnection, err := session.connPool.getIdleConn()
newConnection, err := session.connPool.getIdleConn()
if err != nil {
err = fmt.Errorf(err.Error())
return err
}

// Release connection to pool
session.connPool.release(session.connection)
session.connection = newconnection
session.connection = newConnection
return nil
}

Expand Down
56 changes: 55 additions & 1 deletion session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ package nebula_go
import (
"container/list"
"fmt"
"strconv"
"sync"
"time"

"github.com/vesoft-inc/nebula-go/v3/nebula"
"github.com/vesoft-inc/nebula-go/v3/nebula/graph"
)

// SessionPool is a pool that manages sessions internally.
Expand Down Expand Up @@ -117,7 +119,15 @@ func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]int
}

// Execute the query
resp, err := session.connection.executeWithParameter(session.sessionID, stmt, paramsMap)
execFunc := func(s *Session) (*graph.ExecutionResponse, error) {
resp, err := s.connection.executeWithParameter(s.sessionID, stmt, paramsMap)
if err != nil {
return nil, err
}
return resp, nil
}

resp, err := pool.executeWithRetry(session, execFunc, pool.conf.retryGetSessionTimes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -384,6 +394,50 @@ func (pool *SessionPool) getIdleSession() (*Session, error) {
" session pool and the total session count has reached the limit")
}

// retryGetSession tries to create a new session when the current session is invalid.
func (pool *SessionPool) executeWithRetry(
session *Session,
f func(*Session) (*graph.ExecutionResponse, error),
retry int) (*graph.ExecutionResponse, error) {
pool.rwLock.Lock()
defer pool.rwLock.Unlock()
Comment on lines +402 to +403
Copy link
Contributor

@veezhang veezhang Feb 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be possible to reduce the granularity of lock?
Only need to lock *list.List, not during session creation, execution, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should recheck the lock usage later.


resp, err := f(session)
if err != nil {
pool.removeSessionFromList(&pool.activeSessions, session)
return nil, err
}

if resp.ErrorCode == nebula.ErrorCode_SUCCEEDED {
return resp, nil
} else if ErrorCode(resp.ErrorCode) != ErrorCode_E_SESSION_INVALID { // only retry when the session is invalid
return resp, err
}

// remove invalid session regardless of the retry is successful or not
defer pool.removeSessionFromList(&pool.activeSessions, session)
// If the session is invalid, close it and get a new session
for i := 0; i < retry; i++ {
pool.log.Info("retry to get sessions")
newSession, err := pool.newSession()
if err != nil {
return nil, err
}

pingErr := newSession.Ping()
if pingErr != nil {
pool.log.Error("failed to ping the session, error: " + pingErr.Error())
continue
}
pool.log.Info("retry to get sessions successfully")
pool.addSessionToList(&pool.activeSessions, newSession)

return f(newSession)
}
pool.log.Error(fmt.Sprintf("failed to get session after " + strconv.Itoa(retry) + " retries"))
return nil, fmt.Errorf("failed to get session after %d retries", retry)
}

// startCleaner starts sessionCleaner if idleTime > 0.
func (pool *SessionPool) startCleaner() {
if pool.conf.idleTime > 0 && pool.cleanerChan == nil {
Expand Down
44 changes: 44 additions & 0 deletions session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,50 @@ func TestIdleSessionCleaner(t *testing.T) {
sessionPool.conf.minSize, sessionPool.GetTotalSessionCount())
}

func TestRetryGetSession(t *testing.T) {
err := prepareSpace("client_test")
if err != nil {
t.Fatal(err)
}
defer dropSpace("client_test")

hostAddress := HostAddress{Host: address, Port: port}
config, err := NewSessionPoolConf(
"root",
"nebula",
[]HostAddress{hostAddress},
"client_test")
if err != nil {
t.Errorf("failed to create session pool config, %s", err.Error())
}
config.minSize = 2
config.maxSize = 2
config.retryGetSessionTimes = 1

// create session pool
sessionPool, err := NewSessionPool(*config, DefaultLogger{})
if err != nil {
t.Fatal(err)
}
defer sessionPool.Close()

// kill all sessions in the cluster
resultSet, err := sessionPool.Execute("SHOW SESSIONS | KILL SESSIONS $-.SessionId")
if err != nil {
t.Fatal(err)
}
assert.True(t, resultSet.IsSucceed(), fmt.Errorf("error code: %d, error msg: %s",
resultSet.GetErrorCode(), resultSet.GetErrorMsg()))

// execute query, it should retry to get session
resultSet, err = sessionPool.Execute("SHOW HOSTS;")
if err != nil {
t.Fatal(err)
}
assert.True(t, resultSet.IsSucceed(), fmt.Errorf("error code: %d, error msg: %s",
resultSet.GetErrorCode(), resultSet.GetErrorMsg()))
}

func BenchmarkConcurrency(b *testing.B) {
err := prepareSpace("client_test")
if err != nil {
Expand Down