From 4dc43366a787d156b8b26aa3ae567435f63da0fd Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Thu, 13 Jan 2022 19:02:30 +0100 Subject: [PATCH 1/8] Implement Result.PeekRecord + TestKit messages --- neo4j/result.go | 55 ++++++++++++++++++++++++++++-------- testkit-backend/backend.go | 58 +++++++++++++++++++++++++++----------- 2 files changed, 85 insertions(+), 28 deletions(-) diff --git a/neo4j/result.go b/neo4j/result.go index 91aa9cc3..9e5fd4eb 100644 --- a/neo4j/result.go +++ b/neo4j/result.go @@ -31,6 +31,9 @@ type Result interface { // NextRecord returns true if there is a record to be processed, record parameter is set // to point to current record. NextRecord(record **Record) bool + // PeekRecord returns true if there is a record after the current one to be processed without advancing the record + // stream, record parameter is set to point to that record if present. + PeekRecord(record **Record) bool // Err returns the latest error that caused this Next to return false. Err() error // Record returns the current record. @@ -46,13 +49,16 @@ type Result interface { } type result struct { - conn db.Connection - streamHandle db.StreamHandle - cypher string - params map[string]interface{} - record *Record - summary *db.Summary - err error + conn db.Connection + streamHandle db.StreamHandle + cypher string + params map[string]interface{} + record *Record + summary *db.Summary + err error + peekedRecord *Record + peekedSummary *db.Summary + peeked bool } func newResult(conn db.Connection, str db.StreamHandle, cypher string, params map[string]interface{}) *result { @@ -64,23 +70,48 @@ func newResult(conn db.Connection, str db.StreamHandle, cypher string, params ma } } +func (r *result) advance() { + if r.peeked { + r.record = r.peekedRecord + r.summary = r.peekedSummary + r.peeked = false + } else { + r.record, r.summary, r.err = r.conn.Next(r.streamHandle) + } +} + +func (r *result) peek() { + if !r.peeked { + r.peekedRecord, r.peekedSummary, r.err = r.conn.Next(r.streamHandle) + r.peeked = true + } +} + func (r *result) Keys() ([]string, error) { return r.conn.Keys(r.streamHandle) } func (r *result) Next() bool { - r.record, r.summary, r.err = r.conn.Next(r.streamHandle) + r.advance() return r.record != nil } func (r *result) NextRecord(out **Record) bool { - r.record, r.summary, r.err = r.conn.Next(r.streamHandle) + r.advance() if out != nil { *out = r.record } return r.record != nil } +func (r *result) PeekRecord(out **Record) bool { + r.peek() + if out != nil { + *out = r.peekedRecord + } + return r.peekedRecord != nil +} + func (r *result) Record() *Record { return r.record } @@ -92,7 +123,7 @@ func (r *result) Err() error { func (r *result) Collect() ([]*Record, error) { recs := make([]*Record, 0, 1024) for r.summary == nil && r.err == nil { - r.record, r.summary, r.err = r.conn.Next(r.streamHandle) + r.advance() if r.record != nil { recs = append(recs, r.record) } @@ -109,7 +140,7 @@ func (r *result) buffer() { func (r *result) Single() (*Record, error) { // Try retrieving the single record - r.record, r.summary, r.err = r.conn.Next(r.streamHandle) + r.advance() if r.err != nil { return nil, wrapError(r.err) } @@ -122,7 +153,7 @@ func (r *result) Single() (*Record, error) { single := r.record // Probe connection for more records - r.record, r.summary, r.err = r.conn.Next(r.streamHandle) + r.advance() if r.record != nil { // There were more records, consume the stream since the user didn't // expect more records and should therefore not use them. diff --git a/testkit-backend/backend.go b/testkit-backend/backend.go index d186cc39..6a645f9e 100644 --- a/testkit-backend/backend.go +++ b/testkit-backend/backend.go @@ -132,7 +132,11 @@ func (b *backend) writeError(err error) { if isDriverError { id := b.setError(err) - b.writeResponse("DriverError", map[string]interface{}{"id": id, "msg": err.Error(), "code": code}) + b.writeResponse("DriverError", map[string]interface{}{ + "id": id, + "errorType": strings.Split(err.Error(), ":")[0], + "msg": err.Error(), + "code": code}) return } @@ -341,6 +345,35 @@ func (s serverAddress) Port() string { return s.port } +func (b *backend) writeRecord(result neo4j.Result, record *neo4j.Record, expectRecord *bool) { + if expectRecord != nil { + if *expectRecord && record == nil { + b.writeResponse("BackendError", map[string]interface{}{ + "msg": "Found no record where one was expected.", + }) + } else if !*expectRecord && record != nil { + b.writeResponse("BackendError", map[string]interface{}{ + "msg": "Found a record where none was expected.", + }) + } + } + if record != nil { + values := record.Values + cypherValues := make([]interface{}, len(values)) + for i, v := range values { + cypherValues[i] = nativeToCypher(v) + } + b.writeResponse("Record", map[string]interface{}{"values": cypherValues}) + } else { + err := result.Err() + if err != nil { + b.writeError(err) + return + } + b.writeResponse("NullRecord", nil) + } +} + func (b *backend) handleRequest(req map[string]interface{}) { name := req["name"].(string) data := req["data"].(map[string]interface{}) @@ -557,21 +590,12 @@ func (b *backend) handleRequest(req map[string]interface{}) { case "ResultNext": result := b.results[data["resultId"].(string)] more := result.Next() - if more { - values := result.Record().Values - cypherValues := make([]interface{}, len(values)) - for i, v := range values { - cypherValues[i] = nativeToCypher(v) - } - b.writeResponse("Record", map[string]interface{}{"values": cypherValues}) - } else { - err := result.Err() - if err != nil { - b.writeError(err) - return - } - b.writeResponse("NullRecord", nil) - } + b.writeRecord(result, result.Record(), &more) + case "ResultPeek": + result := b.results[data["resultId"].(string)] + var record *db.Record = nil + more := result.PeekRecord(&record) + b.writeRecord(result, record, &more) case "ResultConsume": result := b.results[data["resultId"].(string)] summary, err := result.Consume() @@ -592,6 +616,7 @@ func (b *backend) handleRequest(req map[string]interface{}) { b.writeResponse("FeatureList", map[string]interface{}{ "features": []string{ "ConfHint:connection.recv_timeout_seconds", + "Feature:API:Result.Peek", "Feature:Auth:Custom", "Feature:Auth:Bearer", "Feature:Auth:Kerberos", @@ -667,5 +692,6 @@ func testSkips() map[string]string { "stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.*": "No GetRoutingTable support - too tricky to implement in Go", "stub.homedb.test_homedb.TestHomeDb.test_session_should_cache_home_db_despite_new_rt": "Driver does not remove servers from RT when connection breaks.", "neo4j.test_authentication.TestAuthenticationBasic.test_error_on_incorrect_credentials_tx": "Driver retries tx on failed authentication.", + "stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_run": "Driver does not reset failed connection but raises error on Session.Close()", } } From c121648a7be806f2d2f929b7ffb7af580107ac3b Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Fri, 14 Jan 2022 17:16:09 +0100 Subject: [PATCH 2/8] Fix session re-raising errors on `.Close` Signed-off-by: Florent Biville --- neo4j/db/connection.go | 2 ++ neo4j/internal/bolt/bolt3.go | 4 ++++ neo4j/internal/bolt/bolt4.go | 4 ++++ neo4j/internal/pool/pool.go | 2 +- neo4j/internal/testutil/connfake.go | 4 ++++ neo4j/transaction.go | 7 ++++++- testkit-backend/backend.go | 1 - 7 files changed, 21 insertions(+), 3 deletions(-) diff --git a/neo4j/db/connection.go b/neo4j/db/connection.go index 8c264baf..4f039425 100644 --- a/neo4j/db/connection.go +++ b/neo4j/db/connection.go @@ -85,6 +85,8 @@ type Connection interface { // Implementation of this should be passive, no pinging or similair since it might be // called rather frequently. IsAlive() bool + // HasFailed returns true if the connection has received a recoverable error (``FAILURE``). + HasFailed() bool // Returns the point in time when this connection was established. Birthdate() time.Time // Resets connection to same state as directly after a connect. diff --git a/neo4j/internal/bolt/bolt3.go b/neo4j/internal/bolt/bolt3.go index b851db8d..cd050315 100644 --- a/neo4j/internal/bolt/bolt3.go +++ b/neo4j/internal/bolt/bolt3.go @@ -649,6 +649,10 @@ func (b *bolt3) IsAlive() bool { return b.state != bolt3_dead } +func (b *bolt3) HasFailed() bool { + return b.state == bolt3_failed +} + func (b *bolt3) Birthdate() time.Time { return b.birthDate } diff --git a/neo4j/internal/bolt/bolt4.go b/neo4j/internal/bolt/bolt4.go index 8f6657a1..d44f6a37 100644 --- a/neo4j/internal/bolt/bolt4.go +++ b/neo4j/internal/bolt/bolt4.go @@ -842,6 +842,10 @@ func (b *bolt4) IsAlive() bool { return b.state != bolt4_dead } +func (b *bolt4) HasFailed() bool { + return b.state == bolt4_failed +} + func (b *bolt4) Birthdate() time.Time { return b.birthDate } diff --git a/neo4j/internal/pool/pool.go b/neo4j/internal/pool/pool.go index 4507c941..f698ba0e 100644 --- a/neo4j/internal/pool/pool.go +++ b/neo4j/internal/pool/pool.go @@ -351,7 +351,7 @@ func (p *Pool) Return(c db.Connection) { return } - c.SetBoltLogger(nil) + defer c.SetBoltLogger(nil) // Get the name of the server that the connection belongs to. serverName := c.ServerName() diff --git a/neo4j/internal/testutil/connfake.go b/neo4j/internal/testutil/connfake.go index 1c26f67b..7317a8f6 100644 --- a/neo4j/internal/testutil/connfake.go +++ b/neo4j/internal/testutil/connfake.go @@ -78,6 +78,10 @@ func (c *ConnFake) IsAlive() bool { return c.Alive } +func (c *ConnFake) HasFailed() bool { + return false +} + func (c *ConnFake) Reset() { } diff --git a/neo4j/transaction.go b/neo4j/transaction.go index 39d9471c..dee075fc 100644 --- a/neo4j/transaction.go +++ b/neo4j/transaction.go @@ -68,7 +68,12 @@ func (tx *transaction) Rollback() error { if tx.done { return tx.err } - tx.err = tx.conn.TxRollback(tx.txHandle) + if !tx.conn.IsAlive() || tx.conn.HasFailed() { + // tx implicitly rolled back by having failed + tx.err = nil + } else { + tx.err = tx.conn.TxRollback(tx.txHandle) + } tx.done = true tx.onClosed() return wrapError(tx.err) diff --git a/testkit-backend/backend.go b/testkit-backend/backend.go index 6a645f9e..dcda45be 100644 --- a/testkit-backend/backend.go +++ b/testkit-backend/backend.go @@ -692,6 +692,5 @@ func testSkips() map[string]string { "stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.*": "No GetRoutingTable support - too tricky to implement in Go", "stub.homedb.test_homedb.TestHomeDb.test_session_should_cache_home_db_despite_new_rt": "Driver does not remove servers from RT when connection breaks.", "neo4j.test_authentication.TestAuthenticationBasic.test_error_on_incorrect_credentials_tx": "Driver retries tx on failed authentication.", - "stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_run": "Driver does not reset failed connection but raises error on Session.Close()", } } From 608dbd31a65095fca19d307c4537ce5e3b4a352d Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Fri, 14 Jan 2022 17:16:58 +0100 Subject: [PATCH 3/8] Code clean-up Signed-off-by: Florent Biville --- neo4j/result.go | 34 ++++++++++----------- testkit-backend/backend.go | 61 +++++++++++++++++++------------------- 2 files changed, 47 insertions(+), 48 deletions(-) diff --git a/neo4j/result.go b/neo4j/result.go index 9e5fd4eb..f23591d2 100644 --- a/neo4j/result.go +++ b/neo4j/result.go @@ -70,23 +70,6 @@ func newResult(conn db.Connection, str db.StreamHandle, cypher string, params ma } } -func (r *result) advance() { - if r.peeked { - r.record = r.peekedRecord - r.summary = r.peekedSummary - r.peeked = false - } else { - r.record, r.summary, r.err = r.conn.Next(r.streamHandle) - } -} - -func (r *result) peek() { - if !r.peeked { - r.peekedRecord, r.peekedSummary, r.err = r.conn.Next(r.streamHandle) - r.peeked = true - } -} - func (r *result) Keys() ([]string, error) { return r.conn.Keys(r.streamHandle) } @@ -196,3 +179,20 @@ func (r *result) Consume() (ResultSummary, error) { } return r.toResultSummary(), nil } + +func (r *result) advance() { + if r.peeked { + r.record, r.peekedRecord = r.peekedRecord, nil + r.summary, r.peekedSummary = r.peekedSummary, nil + r.peeked = false + } else { + r.record, r.summary, r.err = r.conn.Next(r.streamHandle) + } +} + +func (r *result) peek() { + if !r.peeked { + r.peekedRecord, r.peekedSummary, r.err = r.conn.Next(r.streamHandle) + r.peeked = true + } +} \ No newline at end of file diff --git a/testkit-backend/backend.go b/testkit-backend/backend.go index dcda45be..277a4df8 100644 --- a/testkit-backend/backend.go +++ b/testkit-backend/backend.go @@ -345,35 +345,6 @@ func (s serverAddress) Port() string { return s.port } -func (b *backend) writeRecord(result neo4j.Result, record *neo4j.Record, expectRecord *bool) { - if expectRecord != nil { - if *expectRecord && record == nil { - b.writeResponse("BackendError", map[string]interface{}{ - "msg": "Found no record where one was expected.", - }) - } else if !*expectRecord && record != nil { - b.writeResponse("BackendError", map[string]interface{}{ - "msg": "Found a record where none was expected.", - }) - } - } - if record != nil { - values := record.Values - cypherValues := make([]interface{}, len(values)) - for i, v := range values { - cypherValues[i] = nativeToCypher(v) - } - b.writeResponse("Record", map[string]interface{}{"values": cypherValues}) - } else { - err := result.Err() - if err != nil { - b.writeError(err) - return - } - b.writeResponse("NullRecord", nil) - } -} - func (b *backend) handleRequest(req map[string]interface{}) { name := req["name"].(string) data := req["data"].(map[string]interface{}) @@ -590,12 +561,12 @@ func (b *backend) handleRequest(req map[string]interface{}) { case "ResultNext": result := b.results[data["resultId"].(string)] more := result.Next() - b.writeRecord(result, result.Record(), &more) + b.writeRecord(result, result.Record(), more) case "ResultPeek": result := b.results[data["resultId"].(string)] var record *db.Record = nil more := result.PeekRecord(&record) - b.writeRecord(result, record, &more) + b.writeRecord(result, record, more) case "ResultConsume": result := b.results[data["resultId"].(string)] summary, err := result.Consume() @@ -650,6 +621,34 @@ func (b *backend) handleRequest(req map[string]interface{}) { } } +func (b *backend) writeRecord(result neo4j.Result, record *neo4j.Record, expectRecord bool) { + if expectRecord && record == nil { + b.writeResponse("BackendError", map[string]interface{}{ + "msg": "Found no record where one was expected.", + }) + } else if !expectRecord && record != nil { + b.writeResponse("BackendError", map[string]interface{}{ + "msg": "Found a record where none was expected.", + }) + } + + if record != nil { + values := record.Values + cypherValues := make([]interface{}, len(values)) + for i, v := range values { + cypherValues[i] = nativeToCypher(v) + } + b.writeResponse("Record", map[string]interface{}{"values": cypherValues}) + } else { + err := result.Err() + if err != nil { + b.writeError(err) + return + } + b.writeResponse("NullRecord", nil) + } +} + func mustSkip(testName string) (string, bool) { skippedTests := testSkips() for testPattern, exclusionReason := range skippedTests { From 9dc6bf33b01fe0ddc568c56e92551b3325d405c6 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Fri, 14 Jan 2022 17:17:15 +0100 Subject: [PATCH 4/8] Fix invalid TestKit protocol response Signed-off-by: Florent Biville --- testkit-backend/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testkit-backend/backend.go b/testkit-backend/backend.go index 277a4df8..7d038bbc 100644 --- a/testkit-backend/backend.go +++ b/testkit-backend/backend.go @@ -541,7 +541,7 @@ func (b *backend) handleRequest(req map[string]interface{}) { b.writeError(err) return } - b.writeResponse("TransactionClose", map[string]interface{}{"id": txId}) + b.writeResponse("Transaction", map[string]interface{}{"id": txId}) case "SessionReadTransaction": b.handleTransactionFunc(true, data) From aed8ac348965f61a5f9ebc42111ea32e3df72c7c Mon Sep 17 00:00:00 2001 From: Florent Biville Date: Mon, 17 Jan 2022 14:32:13 +0100 Subject: [PATCH 5/8] Clean up test code (remove redundant type declaration) --- neo4j/result_test.go | 64 +++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/neo4j/result_test.go b/neo4j/result_test.go index 4fd92c87..06feac00 100644 --- a/neo4j/result_test.go +++ b/neo4j/result_test.go @@ -41,13 +41,11 @@ func TestResult(ot *testing.T) { cypher := "" params := map[string]interface{}{} recs := []*db.Record{ - &db.Record{}, - &db.Record{}, - &db.Record{}, - } - sums := []*db.Summary{ - &db.Summary{}, + {Keys: []string{"n"}, Values: []interface{}{42}}, + {Keys: []string{"n"}, Values: []interface{}{43}}, + {Keys: []string{"n"}, Values: []interface{}{44}}, } + sums := []*db.Summary{{}} errs := []error{ errors.New("Whatever"), } @@ -80,37 +78,37 @@ func TestResult(ot *testing.T) { { name: "happy", stream: []Next{ - Next{Record: recs[0]}, - Next{Record: recs[1]}, - Next{Summary: sums[0]}, + {Record: recs[0]}, + {Record: recs[1]}, + {Summary: sums[0]}, }, iters: []iter{ - iter{expectNext: true, expectRec: recs[0]}, - iter{expectNext: true, expectRec: recs[1]}, - iter{expectNext: false, expectSum: sums[0]}, + {expectNext: true, expectRec: recs[0]}, + {expectNext: true, expectRec: recs[1]}, + {expectNext: false, expectSum: sums[0]}, }, }, { name: "error after one record", stream: []Next{ - Next{Record: recs[0]}, - Next{Err: errs[0]}, + {Record: recs[0]}, + {Err: errs[0]}, }, iters: []iter{ - iter{expectNext: true, expectRec: recs[0]}, - iter{expectNext: false, expectErr: errs[0]}, + {expectNext: true, expectRec: recs[0]}, + {expectNext: false, expectErr: errs[0]}, }, }, { name: "proceed after error", stream: []Next{ - Next{Record: recs[0]}, - Next{Err: errs[0]}, + {Record: recs[0]}, + {Err: errs[0]}, }, iters: []iter{ - iter{expectNext: true, expectRec: recs[0]}, - iter{expectNext: false, expectErr: errs[0]}, - iter{expectNext: false, expectErr: errs[0]}, + {expectNext: true, expectRec: recs[0]}, + {expectNext: false, expectErr: errs[0]}, + {expectNext: false, expectErr: errs[0]}, }, }, } @@ -148,7 +146,7 @@ func TestResult(ot *testing.T) { conn := &ConnFake{ ConsumeSum: sums[0], ConsumeErr: nil, - Nexts: []Next{Next{Record: recs[0]}}, + Nexts: []Next{{Record: recs[0]}}, } res := newResult(conn, streamHandle, cypher, params) // Get one record to make sure that Record() is cleared @@ -165,7 +163,7 @@ func TestResult(ot *testing.T) { conn := &ConnFake{ ConsumeSum: nil, ConsumeErr: errs[0], - Nexts: []Next{Next{Record: recs[0]}}, + Nexts: []Next{{Record: recs[0]}}, } res := newResult(conn, streamHandle, cypher, params) // Get one record to make sure that Record() is cleared @@ -182,7 +180,7 @@ func TestResult(ot *testing.T) { // Single ot.Run("Single with one record", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Record: recs[0]}, Next{Summary: sums[0]}}, + Nexts: []Next{{Record: recs[0]}, {Summary: sums[0]}}, } res := newResult(conn, streamHandle, cypher, params) rec, err := res.Single() @@ -194,7 +192,7 @@ func TestResult(ot *testing.T) { }) ot.Run("Single with no record", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Summary: sums[0]}}, + Nexts: []Next{{Summary: sums[0]}}, } res := newResult(conn, streamHandle, cypher, params) rec, err := res.Single() @@ -208,7 +206,7 @@ func TestResult(ot *testing.T) { ot.Run("Single with two records", func(t *testing.T) { calledConsume := false conn := &ConnFake{ - Nexts: []Next{Next{Record: recs[0]}, Next{Record: recs[1]}, Next{Summary: sums[0]}}, + Nexts: []Next{{Record: recs[0]}, {Record: recs[1]}, {Summary: sums[0]}}, ConsumeHook: func() { calledConsume = true }, @@ -232,7 +230,7 @@ func TestResult(ot *testing.T) { }) ot.Run("Single with error", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Err: errs[0]}}, + Nexts: []Next{{Err: errs[0]}}, } res := newResult(conn, streamHandle, cypher, params) rec, err := res.Single() @@ -246,7 +244,7 @@ func TestResult(ot *testing.T) { // Collect ot.Run("Collect n records", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Record: recs[0]}, Next{Record: recs[1]}, Next{Summary: sums[0]}}, + Nexts: []Next{{Record: recs[0]}, {Record: recs[1]}, {Summary: sums[0]}}, } res := newResult(conn, streamHandle, cypher, params) coll, err := res.Collect() @@ -260,7 +258,7 @@ func TestResult(ot *testing.T) { }) ot.Run("Collect n records after Next", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Record: recs[0]}, Next{Record: recs[1]}, Next{Record: recs[2]}, Next{Summary: sums[0]}}, + Nexts: []Next{{Record: recs[0]}, {Record: recs[1]}, {Record: recs[2]}, {Summary: sums[0]}}, } res := newResult(conn, streamHandle, cypher, params) res.Next() @@ -276,7 +274,7 @@ func TestResult(ot *testing.T) { }) ot.Run("Collect empty", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Summary: sums[0]}}, + Nexts: []Next{{Summary: sums[0]}}, } res := newResult(conn, streamHandle, cypher, params) coll, err := res.Collect() @@ -287,7 +285,7 @@ func TestResult(ot *testing.T) { }) ot.Run("Collect emptied", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Summary: sums[0]}}, + Nexts: []Next{{Summary: sums[0]}}, } res := newResult(conn, streamHandle, cypher, params) res.Next() @@ -300,7 +298,7 @@ func TestResult(ot *testing.T) { }) ot.Run("Collect error", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Err: errs[0]}}, + Nexts: []Next{{Err: errs[0]}}, } res := newResult(conn, streamHandle, cypher, params) coll, err := res.Collect() @@ -311,7 +309,7 @@ func TestResult(ot *testing.T) { }) ot.Run("Collect stream error", func(t *testing.T) { conn := &ConnFake{ - Nexts: []Next{Next{Record: recs[0]}, Next{Err: errs[0]}}, + Nexts: []Next{{Record: recs[0]}, {Err: errs[0]}}, } res := newResult(conn, streamHandle, cypher, params) coll, err := res.Collect() From 20b49c81578f115ad845d98668636309900c9b5d Mon Sep 17 00:00:00 2001 From: Florent Biville Date: Mon, 17 Jan 2022 14:32:37 +0100 Subject: [PATCH 6/8] Test peek behavior with fake connection --- neo4j/internal/testutil/asserts.go | 17 +++++++++++++++++ neo4j/internal/testutil/connfake.go | 13 +++++++++---- neo4j/result_test.go | 21 +++++++++++++++++++++ 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/neo4j/internal/testutil/asserts.go b/neo4j/internal/testutil/asserts.go index 971635b5..b89571f5 100644 --- a/neo4j/internal/testutil/asserts.go +++ b/neo4j/internal/testutil/asserts.go @@ -195,3 +195,20 @@ func AssertSameType(t *testing.T, x, y interface{}) { t.Errorf("Expected types of %s and %s to be same but was %s and %s", x, y, t1, t2) } } + +func AssertDeepEquals(t *testing.T, values ...interface{}) { + t.Helper() + count := len(values) + if count == 0 { + return + } + prev := values[0] + for i := 1; i < count; i++ { + current := values[i] + if !reflect.DeepEqual(prev, current) { + t.Errorf("Expected value %v (parameter %d) to equal value %v (parameter %d)", prev, i-1, current, i) + return + } + prev = current + } +} diff --git a/neo4j/internal/testutil/connfake.go b/neo4j/internal/testutil/connfake.go index 7317a8f6..63541a6a 100644 --- a/neo4j/internal/testutil/connfake.go +++ b/neo4j/internal/testutil/connfake.go @@ -152,11 +152,16 @@ func (c *ConnFake) Keys(streamHandle db.StreamHandle) ([]string, error) { } func (c *ConnFake) Next(streamHandle db.StreamHandle) (*db.Record, *db.Summary, error) { - next := c.Nexts[0] - if len(c.Nexts) > 1 { - c.Nexts = c.Nexts[1:] + if len(c.Nexts) >= 1 { + next := c.Nexts[0] + // moves to next record only if the current record is not an error or summary + // this emulates the stream buffering of a real connection + if next.Err == nil && next.Summary == nil { + c.Nexts = c.Nexts[1:] + } + return next.Record, next.Summary, next.Err } - return next.Record, next.Summary, next.Err + return nil, nil, nil } func (c *ConnFake) ForceReset() error { diff --git a/neo4j/result_test.go b/neo4j/result_test.go index 06feac00..a6c6814c 100644 --- a/neo4j/result_test.go +++ b/neo4j/result_test.go @@ -141,6 +141,27 @@ func TestResult(ot *testing.T) { }) } + // PeekRecord + ot.Run("Peeks records", func(t *testing.T) { + var peedkedFirst *Record + var peekedSecond *Record + var nextFirst *Record + var peekedAfterNextFirst *Record + var nextSecond *Record + conn := &ConnFake{Nexts: []Next{{Record: recs[0]}}} + + result := newResult(conn, streamHandle, cypher, params) + + AssertTrue(t, result.PeekRecord(&peedkedFirst)) + AssertTrue(t, result.PeekRecord(&peekedSecond)) + AssertTrue(t, result.NextRecord(&nextFirst)) + AssertDeepEquals(t, recs[0], peedkedFirst, peekedSecond, nextFirst) + AssertFalse(t, result.PeekRecord(&peekedAfterNextFirst)) + AssertNil(t, peekedAfterNextFirst) + AssertFalse(t, result.NextRecord(&nextSecond)) + AssertNil(t, nextSecond) + }) + // Consume ot.Run("Consume with summary", func(t *testing.T) { conn := &ConnFake{ From 7adde45749bf402275fa35224049019d5cf02c28 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Mon, 24 Jan 2022 19:01:10 +0100 Subject: [PATCH 7/8] Adjust integration test --- neo4j/test-integration/bookmark_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neo4j/test-integration/bookmark_test.go b/neo4j/test-integration/bookmark_test.go index c57f2195..f2c27bec 100644 --- a/neo4j/test-integration/bookmark_test.go +++ b/neo4j/test-integration/bookmark_test.go @@ -251,7 +251,7 @@ var _ = Describe("Bookmark", func() { Expect(err).To(Not(BeNil())) err = tx.Close() - Expect(err).To(Not(BeNil())) + Expect(err).To(BeNil()) Expect(session.LastBookmark()).To(Equal(bookmark)) }) From 0023ef7034a32fad19a67be36280cd4c5dbc886f Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Tue, 25 Jan 2022 10:07:59 +0100 Subject: [PATCH 8/8] Fix race condition Signed-off-by: Florent Biville --- neo4j/internal/pool/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/neo4j/internal/pool/pool.go b/neo4j/internal/pool/pool.go index f698ba0e..ea8b5b6b 100644 --- a/neo4j/internal/pool/pool.go +++ b/neo4j/internal/pool/pool.go @@ -351,8 +351,6 @@ func (p *Pool) Return(c db.Connection) { return } - defer c.SetBoltLogger(nil) - // Get the name of the server that the connection belongs to. serverName := c.ServerName() isAlive := c.IsAlive() @@ -380,6 +378,8 @@ func (p *Pool) Return(c db.Connection) { isAlive = c.IsAlive() } + c.SetBoltLogger(nil) + // Shouldn't return a too old or dead connection back to the pool if !isAlive || age >= p.maxAge { p.unreg(serverName, c, now)