diff --git a/README.md b/README.md index 69d1e785..6b8c8301 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ **IMPORTANT: Code of Nebula go client has been transferred from [nebula-clients](https://github.com/vesoft-inc/nebula-clients) to this repository(nebula-go), and new releases in the future will be published in this repository. Please update your go.mod and imports correspondingly.** -Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/facebook/fbthrift/). Currently the latest stable release is **[v3.3.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.3)** +Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/facebook/fbthrift/). Currently the latest stable release is **[v3.4.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.4)** The code in **master branch** will be updated to accommodate the nightly changes made in NebulaGraph. To Use the console with a stable release of NebulaGraph, please check the branches and use the corresponding version. @@ -22,6 +22,7 @@ To Use the console with a stable release of NebulaGraph, please check the branch | **[v3.1.x](https://github.com/vesoft-inc/nebula-go/tree/v3.1.0)** | 3.1.x | | **[v3.2.x](https://github.com/vesoft-inc/nebula-go/tree/v3.2.0)** | 3.1.x-3.2.x | | **[v3.3.x](https://github.com/vesoft-inc/nebula-go/tree/v3.3.0)** | 3.1.x-3.3.x | +| **[v3.4.x](https://github.com/vesoft-inc/nebula-go/tree/v3.4.0)** | 3.1.x-3.4.x | | **[master](https://github.com/vesoft-inc/nebula-go/tree/master)** | 3.x-nightly | Please be careful not to modify the files in the nebula directory, these codes were all generated by fbthrift. @@ -37,7 +38,7 @@ $ go get -u -v github.com/vesoft-inc/nebula-go/v3@master You can specify the version of Nebula-go by substituting `` in `$ go get -u -v github.com/vesoft-inc/nebula-go@`. For example: - for v3: `$ go get -u -v github.com/vesoft-inc/nebula-go/v3@v3.3.0` + for v3: `$ go get -u -v github.com/vesoft-inc/nebula-go/v3@v3.4.0` for v2: `$ go get -u -v github.com/vesoft-inc/nebula-go/v2@v2.6.0` diff --git a/client_test.go b/client_test.go index 97cc8b18..729d2c31 100644 --- a/client_test.go +++ b/client_test.go @@ -1202,15 +1202,11 @@ func TestReconnect(t *testing.T) { defer pool.Close() // Create session - var sessionList []*Session - - for i := 0; i < 3; i++ { - session, err := pool.GetSession(username, password) - if err != nil { - t.Errorf("fail to create a new session from connection pool, %s", err.Error()) - } - sessionList = append(sessionList, session) + session, err := pool.GetSession(username, password) + if err != nil { + t.Errorf("fail to create a new session from connection pool, %s", err.Error()) } + defer session.Release() // Send query to server periodically for i := 0; i < timeoutConfig.MaxConnPoolSize; i++ { @@ -1221,7 +1217,7 @@ func TestReconnect(t *testing.T) { if i == 7 { stopContainer(t, "nebula-docker-compose_graphd1_1") } - _, err := sessionList[0].Execute("SHOW HOSTS;") + _, err := session.Execute("SHOW HOSTS;") fmt.Println("Sending query...") if err != nil { @@ -1230,7 +1226,7 @@ func TestReconnect(t *testing.T) { } } - resp, err := sessionList[0].Execute("SHOW HOSTS;") + resp, err := session.Execute("SHOW HOSTS;") if err != nil { t.Fatalf(err.Error()) return @@ -1240,10 +1236,6 @@ func TestReconnect(t *testing.T) { startContainer(t, "nebula-docker-compose_graphd_1") startContainer(t, "nebula-docker-compose_graphd1_1") - for i := 0; i < len(sessionList); i++ { - sessionList[i].Release() - } - // Wait for graphd to be up time.Sleep(5 * time.Second) } diff --git a/configs.go b/configs.go index b9aae63c..01a188b5 100644 --- a/configs.go +++ b/configs.go @@ -112,12 +112,13 @@ func openAndReadFile(path string) ([]byte, error) { // SessionPoolConf is the configs of a session pool // Note that the space name is bound to the session pool for its lifetime type SessionPoolConf struct { - username string // username for authentication - password string // password for authentication - serviceAddrs []HostAddress // service addresses for session pool - hostIndex int // index of the host in ServiceAddrs that the next new session will connect to - spaceName string // The space name that all sessions in the pool are bound to - sslConfig *tls.Config // Optional SSL config for the connection + username string // username for authentication + password string // password for authentication + serviceAddrs []HostAddress // service addresses for session pool + hostIndex int // index of the host in ServiceAddrs that the next new session will connect to + spaceName string // The space name that all sessions in the pool are bound to + sslConfig *tls.Config // Optional SSL config for the connection + retryGetSessionTimes int // The max times to retry get new session when executing a query // Basic pool configs // Socket timeout and Socket connection timeout, unit: seconds @@ -143,15 +144,16 @@ func NewSessionPoolConf( spaceName string, opts ...SessionPoolConfOption) (*SessionPoolConf, error) { // Set default values for basic pool configs newPoolConf := SessionPoolConf{ - username: username, - password: password, - serviceAddrs: serviceAddrs, - spaceName: spaceName, - timeOut: 0 * time.Millisecond, - idleTime: 0 * time.Millisecond, - maxSize: 30, - minSize: 1, - hostIndex: 0, + username: username, + password: password, + serviceAddrs: serviceAddrs, + spaceName: spaceName, + retryGetSessionTimes: 1, + timeOut: 0 * time.Millisecond, + idleTime: 0 * time.Millisecond, + maxSize: 30, + minSize: 1, + hostIndex: 0, } // Iterate the given options and apply them to the config. diff --git a/session.go b/session.go index 3b611b2f..8cf23e43 100644 --- a/session.go +++ b/session.go @@ -198,7 +198,7 @@ func (session *Session) ExecuteJsonWithParameter(stmt string, params map[string] } func (session *Session) reConnect() error { - newconnection, err := session.connPool.getIdleConn() + newConnection, err := session.connPool.getIdleConn() if err != nil { err = fmt.Errorf(err.Error()) return err @@ -206,7 +206,7 @@ func (session *Session) reConnect() error { // Release connection to pool session.connPool.release(session.connection) - session.connection = newconnection + session.connection = newConnection return nil } diff --git a/session_pool.go b/session_pool.go index 830ea94c..e778a5c3 100644 --- a/session_pool.go +++ b/session_pool.go @@ -11,10 +11,12 @@ package nebula_go import ( "container/list" "fmt" + "strconv" "sync" "time" "github.com/vesoft-inc/nebula-go/v3/nebula" + "github.com/vesoft-inc/nebula-go/v3/nebula/graph" ) // SessionPool is a pool that manages sessions internally. @@ -117,7 +119,15 @@ func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]int } // Execute the query - resp, err := session.connection.executeWithParameter(session.sessionID, stmt, paramsMap) + execFunc := func(s *Session) (*graph.ExecutionResponse, error) { + resp, err := s.connection.executeWithParameter(s.sessionID, stmt, paramsMap) + if err != nil { + return nil, err + } + return resp, nil + } + + resp, err := pool.executeWithRetry(session, execFunc, pool.conf.retryGetSessionTimes) if err != nil { return nil, err } @@ -384,6 +394,50 @@ func (pool *SessionPool) getIdleSession() (*Session, error) { " session pool and the total session count has reached the limit") } +// retryGetSession tries to create a new session when the current session is invalid. +func (pool *SessionPool) executeWithRetry( + session *Session, + f func(*Session) (*graph.ExecutionResponse, error), + retry int) (*graph.ExecutionResponse, error) { + pool.rwLock.Lock() + defer pool.rwLock.Unlock() + + resp, err := f(session) + if err != nil { + pool.removeSessionFromList(&pool.activeSessions, session) + return nil, err + } + + if resp.ErrorCode == nebula.ErrorCode_SUCCEEDED { + return resp, nil + } else if ErrorCode(resp.ErrorCode) != ErrorCode_E_SESSION_INVALID { // only retry when the session is invalid + return resp, err + } + + // remove invalid session regardless of the retry is successful or not + defer pool.removeSessionFromList(&pool.activeSessions, session) + // If the session is invalid, close it and get a new session + for i := 0; i < retry; i++ { + pool.log.Info("retry to get sessions") + newSession, err := pool.newSession() + if err != nil { + return nil, err + } + + pingErr := newSession.Ping() + if pingErr != nil { + pool.log.Error("failed to ping the session, error: " + pingErr.Error()) + continue + } + pool.log.Info("retry to get sessions successfully") + pool.addSessionToList(&pool.activeSessions, newSession) + + return f(newSession) + } + pool.log.Error(fmt.Sprintf("failed to get session after " + strconv.Itoa(retry) + " retries")) + return nil, fmt.Errorf("failed to get session after %d retries", retry) +} + // startCleaner starts sessionCleaner if idleTime > 0. func (pool *SessionPool) startCleaner() { if pool.conf.idleTime > 0 && pool.cleanerChan == nil { diff --git a/session_pool_test.go b/session_pool_test.go index cd4c5006..5b36e1ce 100644 --- a/session_pool_test.go +++ b/session_pool_test.go @@ -319,6 +319,50 @@ func TestIdleSessionCleaner(t *testing.T) { sessionPool.conf.minSize, sessionPool.GetTotalSessionCount()) } +func TestRetryGetSession(t *testing.T) { + err := prepareSpace("client_test") + if err != nil { + t.Fatal(err) + } + defer dropSpace("client_test") + + hostAddress := HostAddress{Host: address, Port: port} + config, err := NewSessionPoolConf( + "root", + "nebula", + []HostAddress{hostAddress}, + "client_test") + if err != nil { + t.Errorf("failed to create session pool config, %s", err.Error()) + } + config.minSize = 2 + config.maxSize = 2 + config.retryGetSessionTimes = 1 + + // create session pool + sessionPool, err := NewSessionPool(*config, DefaultLogger{}) + if err != nil { + t.Fatal(err) + } + defer sessionPool.Close() + + // kill all sessions in the cluster + resultSet, err := sessionPool.Execute("SHOW SESSIONS | KILL SESSIONS $-.SessionId") + if err != nil { + t.Fatal(err) + } + assert.True(t, resultSet.IsSucceed(), fmt.Errorf("error code: %d, error msg: %s", + resultSet.GetErrorCode(), resultSet.GetErrorMsg())) + + // execute query, it should retry to get session + resultSet, err = sessionPool.Execute("SHOW HOSTS;") + if err != nil { + t.Fatal(err) + } + assert.True(t, resultSet.IsSucceed(), fmt.Errorf("error code: %d, error msg: %s", + resultSet.GetErrorCode(), resultSet.GetErrorMsg())) +} + func BenchmarkConcurrency(b *testing.B) { err := prepareSpace("client_test") if err != nil {