From 727c463cdaccfc3aed66a7e7d9d2a31a4ee81cfb Mon Sep 17 00:00:00 2001 From: Yichen Wang <18348405+Aiee@users.noreply.github.com> Date: Mon, 6 Feb 2023 11:58:48 +0800 Subject: [PATCH 1/5] Update readme for v3.4 release (#250) * Fix format * Update README --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 69d1e785..6b8c8301 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ **IMPORTANT: Code of Nebula go client has been transferred from [nebula-clients](https://github.com/vesoft-inc/nebula-clients) to this repository(nebula-go), and new releases in the future will be published in this repository. Please update your go.mod and imports correspondingly.** -Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/facebook/fbthrift/). Currently the latest stable release is **[v3.3.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.3)** +Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/facebook/fbthrift/). Currently the latest stable release is **[v3.4.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.4)** The code in **master branch** will be updated to accommodate the nightly changes made in NebulaGraph. To Use the console with a stable release of NebulaGraph, please check the branches and use the corresponding version. @@ -22,6 +22,7 @@ To Use the console with a stable release of NebulaGraph, please check the branch | **[v3.1.x](https://github.com/vesoft-inc/nebula-go/tree/v3.1.0)** | 3.1.x | | **[v3.2.x](https://github.com/vesoft-inc/nebula-go/tree/v3.2.0)** | 3.1.x-3.2.x | | **[v3.3.x](https://github.com/vesoft-inc/nebula-go/tree/v3.3.0)** | 3.1.x-3.3.x | +| **[v3.4.x](https://github.com/vesoft-inc/nebula-go/tree/v3.4.0)** | 3.1.x-3.4.x | | **[master](https://github.com/vesoft-inc/nebula-go/tree/master)** | 3.x-nightly | Please be careful not to modify the files in the nebula directory, these codes were all generated by fbthrift. @@ -37,7 +38,7 @@ $ go get -u -v github.com/vesoft-inc/nebula-go/v3@master You can specify the version of Nebula-go by substituting `` in `$ go get -u -v github.com/vesoft-inc/nebula-go@`. For example: - for v3: `$ go get -u -v github.com/vesoft-inc/nebula-go/v3@v3.3.0` + for v3: `$ go get -u -v github.com/vesoft-inc/nebula-go/v3@v3.4.0` for v2: `$ go get -u -v github.com/vesoft-inc/nebula-go/v2@v2.6.0` From 285ea9ee7d9cce5d2bf0b18e24b7b4363fd64328 Mon Sep 17 00:00:00 2001 From: Aiee <18348405+Aiee@users.noreply.github.com> Date: Wed, 8 Feb 2023 17:16:07 +0800 Subject: [PATCH 2/5] Retry to get session when session is invalid --- client_test.go | 2 +- configs.go | 32 +++++++++++++++-------------- session.go | 4 ++-- session_pool.go | 48 +++++++++++++++++++++++++++++++++++++++++++- session_pool_test.go | 44 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 19 deletions(-) diff --git a/client_test.go b/client_test.go index 97cc8b18..4b891058 100644 --- a/client_test.go +++ b/client_test.go @@ -26,7 +26,7 @@ import ( const ( address = "127.0.0.1" - port = 3699 + port = 29562 username = "root" password = "nebula" diff --git a/configs.go b/configs.go index b9aae63c..479af4e9 100644 --- a/configs.go +++ b/configs.go @@ -112,12 +112,13 @@ func openAndReadFile(path string) ([]byte, error) { // SessionPoolConf is the configs of a session pool // Note that the space name is bound to the session pool for its lifetime type SessionPoolConf struct { - username string // username for authentication - password string // password for authentication - serviceAddrs []HostAddress // service addresses for session pool - hostIndex int // index of the host in ServiceAddrs that the next new session will connect to - spaceName string // The space name that all sessions in the pool are bound to - sslConfig *tls.Config // Optional SSL config for the connection + username string // username for authentication + password string // password for authentication + serviceAddrs []HostAddress // service addresses for session pool + hostIndex int // index of the host in ServiceAddrs that the next new session will connect to + spaceName string // The space name that all sessions in the pool are bound to + sslConfig *tls.Config // Optional SSL config for the connection + retryGetSessionTimes int // The max times to retry get session from pool executing a single query // Basic pool configs // Socket timeout and Socket connection timeout, unit: seconds @@ -143,15 +144,16 @@ func NewSessionPoolConf( spaceName string, opts ...SessionPoolConfOption) (*SessionPoolConf, error) { // Set default values for basic pool configs newPoolConf := SessionPoolConf{ - username: username, - password: password, - serviceAddrs: serviceAddrs, - spaceName: spaceName, - timeOut: 0 * time.Millisecond, - idleTime: 0 * time.Millisecond, - maxSize: 30, - minSize: 1, - hostIndex: 0, + username: username, + password: password, + serviceAddrs: serviceAddrs, + spaceName: spaceName, + retryGetSessionTimes: 0, + timeOut: 0 * time.Millisecond, + idleTime: 0 * time.Millisecond, + maxSize: 30, + minSize: 1, + hostIndex: 0, } // Iterate the given options and apply them to the config. diff --git a/session.go b/session.go index 3b611b2f..8cf23e43 100644 --- a/session.go +++ b/session.go @@ -198,7 +198,7 @@ func (session *Session) ExecuteJsonWithParameter(stmt string, params map[string] } func (session *Session) reConnect() error { - newconnection, err := session.connPool.getIdleConn() + newConnection, err := session.connPool.getIdleConn() if err != nil { err = fmt.Errorf(err.Error()) return err @@ -206,7 +206,7 @@ func (session *Session) reConnect() error { // Release connection to pool session.connPool.release(session.connection) - session.connection = newconnection + session.connection = newConnection return nil } diff --git a/session_pool.go b/session_pool.go index 830ea94c..620db017 100644 --- a/session_pool.go +++ b/session_pool.go @@ -11,10 +11,12 @@ package nebula_go import ( "container/list" "fmt" + "strconv" "sync" "time" "github.com/vesoft-inc/nebula-go/v3/nebula" + "github.com/vesoft-inc/nebula-go/v3/nebula/graph" ) // SessionPool is a pool that manages sessions internally. @@ -117,7 +119,15 @@ func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]int } // Execute the query - resp, err := session.connection.executeWithParameter(session.sessionID, stmt, paramsMap) + execFunc := func(s *Session) (*graph.ExecutionResponse, error) { + resp, err := s.connection.executeWithParameter(s.sessionID, stmt, paramsMap) + if err != nil { + return nil, err + } + return resp, nil + } + + resp, err := pool.executeWithRetry(session, execFunc, pool.conf.retryGetSessionTimes) if err != nil { return nil, err } @@ -384,6 +394,42 @@ func (pool *SessionPool) getIdleSession() (*Session, error) { " session pool and the total session count has reached the limit") } +// retryGetSession tries to get a session from the pool for retry times. +func (pool *SessionPool) executeWithRetry(session *Session, f func(*Session) (*graph.ExecutionResponse, error), + retry int) (*graph.ExecutionResponse, error) { + resp, err := f(session) + if err != nil { + return nil, err + } + + if resp.ErrorCode == nebula.ErrorCode_SUCCEEDED { + return resp, nil + } else if ErrorCode(resp.ErrorCode) != ErrorCode_E_SESSION_INVALID { // only retry when the session is invalid + return resp, err + } + + // If the session is invalid, close it and get a new session + for i := 0; i < retry; i++ { + pool.log.Info("retry to get sessions") + newSession, err := pool.newSession() + if err != nil { + return nil, err + } + + pingErr := newSession.Ping() + if pingErr != nil { + pool.log.Error("failed to ping the session, error: " + pingErr.Error()) + continue + } + pool.log.Info("retry to get sessions successfully") + pool.addSessionToList(&pool.activeSessions, newSession) + pool.removeSessionFromList(&pool.activeSessions, session) + return f(newSession) + } + pool.log.Error(fmt.Sprintf("failed to get session after " + strconv.Itoa(retry) + " retries")) + return nil, err +} + // startCleaner starts sessionCleaner if idleTime > 0. func (pool *SessionPool) startCleaner() { if pool.conf.idleTime > 0 && pool.cleanerChan == nil { diff --git a/session_pool_test.go b/session_pool_test.go index cd4c5006..5b36e1ce 100644 --- a/session_pool_test.go +++ b/session_pool_test.go @@ -319,6 +319,50 @@ func TestIdleSessionCleaner(t *testing.T) { sessionPool.conf.minSize, sessionPool.GetTotalSessionCount()) } +func TestRetryGetSession(t *testing.T) { + err := prepareSpace("client_test") + if err != nil { + t.Fatal(err) + } + defer dropSpace("client_test") + + hostAddress := HostAddress{Host: address, Port: port} + config, err := NewSessionPoolConf( + "root", + "nebula", + []HostAddress{hostAddress}, + "client_test") + if err != nil { + t.Errorf("failed to create session pool config, %s", err.Error()) + } + config.minSize = 2 + config.maxSize = 2 + config.retryGetSessionTimes = 1 + + // create session pool + sessionPool, err := NewSessionPool(*config, DefaultLogger{}) + if err != nil { + t.Fatal(err) + } + defer sessionPool.Close() + + // kill all sessions in the cluster + resultSet, err := sessionPool.Execute("SHOW SESSIONS | KILL SESSIONS $-.SessionId") + if err != nil { + t.Fatal(err) + } + assert.True(t, resultSet.IsSucceed(), fmt.Errorf("error code: %d, error msg: %s", + resultSet.GetErrorCode(), resultSet.GetErrorMsg())) + + // execute query, it should retry to get session + resultSet, err = sessionPool.Execute("SHOW HOSTS;") + if err != nil { + t.Fatal(err) + } + assert.True(t, resultSet.IsSucceed(), fmt.Errorf("error code: %d, error msg: %s", + resultSet.GetErrorCode(), resultSet.GetErrorMsg())) +} + func BenchmarkConcurrency(b *testing.B) { err := prepareSpace("client_test") if err != nil { From 3e97db499705e3580dbafdb6ad0b5d05e93b7d0d Mon Sep 17 00:00:00 2001 From: Aiee <18348405+Aiee@users.noreply.github.com> Date: Wed, 8 Feb 2023 17:19:01 +0800 Subject: [PATCH 3/5] Simplify client test --- client_test.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/client_test.go b/client_test.go index 4b891058..729d2c31 100644 --- a/client_test.go +++ b/client_test.go @@ -26,7 +26,7 @@ import ( const ( address = "127.0.0.1" - port = 29562 + port = 3699 username = "root" password = "nebula" @@ -1202,15 +1202,11 @@ func TestReconnect(t *testing.T) { defer pool.Close() // Create session - var sessionList []*Session - - for i := 0; i < 3; i++ { - session, err := pool.GetSession(username, password) - if err != nil { - t.Errorf("fail to create a new session from connection pool, %s", err.Error()) - } - sessionList = append(sessionList, session) + session, err := pool.GetSession(username, password) + if err != nil { + t.Errorf("fail to create a new session from connection pool, %s", err.Error()) } + defer session.Release() // Send query to server periodically for i := 0; i < timeoutConfig.MaxConnPoolSize; i++ { @@ -1221,7 +1217,7 @@ func TestReconnect(t *testing.T) { if i == 7 { stopContainer(t, "nebula-docker-compose_graphd1_1") } - _, err := sessionList[0].Execute("SHOW HOSTS;") + _, err := session.Execute("SHOW HOSTS;") fmt.Println("Sending query...") if err != nil { @@ -1230,7 +1226,7 @@ func TestReconnect(t *testing.T) { } } - resp, err := sessionList[0].Execute("SHOW HOSTS;") + resp, err := session.Execute("SHOW HOSTS;") if err != nil { t.Fatalf(err.Error()) return @@ -1240,10 +1236,6 @@ func TestReconnect(t *testing.T) { startContainer(t, "nebula-docker-compose_graphd_1") startContainer(t, "nebula-docker-compose_graphd1_1") - for i := 0; i < len(sessionList); i++ { - sessionList[i].Release() - } - // Wait for graphd to be up time.Sleep(5 * time.Second) } From 5d69c432513bef051650a1c16764db7fe69090d5 Mon Sep 17 00:00:00 2001 From: Aiee <18348405+Aiee@users.noreply.github.com> Date: Wed, 8 Feb 2023 17:21:00 +0800 Subject: [PATCH 4/5] Add comments --- configs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/configs.go b/configs.go index 479af4e9..01a188b5 100644 --- a/configs.go +++ b/configs.go @@ -118,7 +118,7 @@ type SessionPoolConf struct { hostIndex int // index of the host in ServiceAddrs that the next new session will connect to spaceName string // The space name that all sessions in the pool are bound to sslConfig *tls.Config // Optional SSL config for the connection - retryGetSessionTimes int // The max times to retry get session from pool executing a single query + retryGetSessionTimes int // The max times to retry get new session when executing a query // Basic pool configs // Socket timeout and Socket connection timeout, unit: seconds @@ -148,7 +148,7 @@ func NewSessionPoolConf( password: password, serviceAddrs: serviceAddrs, spaceName: spaceName, - retryGetSessionTimes: 0, + retryGetSessionTimes: 1, timeOut: 0 * time.Millisecond, idleTime: 0 * time.Millisecond, maxSize: 30, From d3b3e3cff1b9fd153f7cdd56a69d7acc46742aa2 Mon Sep 17 00:00:00 2001 From: Aiee <18348405+Aiee@users.noreply.github.com> Date: Thu, 9 Feb 2023 19:45:16 +0800 Subject: [PATCH 5/5] Add comments --- session_pool.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/session_pool.go b/session_pool.go index 620db017..e778a5c3 100644 --- a/session_pool.go +++ b/session_pool.go @@ -394,11 +394,17 @@ func (pool *SessionPool) getIdleSession() (*Session, error) { " session pool and the total session count has reached the limit") } -// retryGetSession tries to get a session from the pool for retry times. -func (pool *SessionPool) executeWithRetry(session *Session, f func(*Session) (*graph.ExecutionResponse, error), +// retryGetSession tries to create a new session when the current session is invalid. +func (pool *SessionPool) executeWithRetry( + session *Session, + f func(*Session) (*graph.ExecutionResponse, error), retry int) (*graph.ExecutionResponse, error) { + pool.rwLock.Lock() + defer pool.rwLock.Unlock() + resp, err := f(session) if err != nil { + pool.removeSessionFromList(&pool.activeSessions, session) return nil, err } @@ -408,6 +414,8 @@ func (pool *SessionPool) executeWithRetry(session *Session, f func(*Session) (*g return resp, err } + // remove invalid session regardless of the retry is successful or not + defer pool.removeSessionFromList(&pool.activeSessions, session) // If the session is invalid, close it and get a new session for i := 0; i < retry; i++ { pool.log.Info("retry to get sessions") @@ -423,11 +431,11 @@ func (pool *SessionPool) executeWithRetry(session *Session, f func(*Session) (*g } pool.log.Info("retry to get sessions successfully") pool.addSessionToList(&pool.activeSessions, newSession) - pool.removeSessionFromList(&pool.activeSessions, session) + return f(newSession) } pool.log.Error(fmt.Sprintf("failed to get session after " + strconv.Itoa(retry) + " retries")) - return nil, err + return nil, fmt.Errorf("failed to get session after %d retries", retry) } // startCleaner starts sessionCleaner if idleTime > 0.