From 0cdf4a66cbf4b1ce0e21281c31ad1ff5b7aa0614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Skrz=C4=99tnicki?= Date: Wed, 30 Aug 2023 12:24:38 +0200 Subject: [PATCH 1/3] MySQL: avoid tiny writes to improve performance in read-heavy scenarios This change is analogous to the Postgres change made in #29812. --- lib/srv/db/mysql/engine.go | 53 ++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/lib/srv/db/mysql/engine.go b/lib/srv/db/mysql/engine.go index 83af22edac3e8..71de39eebd2a6 100644 --- a/lib/srv/db/mysql/engine.go +++ b/lib/srv/db/mysql/engine.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "fmt" + "io" "net" "time" @@ -383,34 +384,46 @@ func (e *Engine) receiveFromServer(serverConn, clientConn net.Conn, serverErrCh "client": clientConn.RemoteAddr(), "server": serverConn.RemoteAddr(), }) - defer func() { - log.Debug("Stop receiving from server.") - close(serverErrCh) - }() + ctr := common.GetMessagesFromServerMetric(sessionCtx.Database) - msgFromServer := common.GetMessagesFromServerMetric(sessionCtx.Database) + // parse and count the messages from the server in a separate goroutine, + // operating on a copy of the server message stream. the copy is arranged below. + copyReader, copyWriter := io.Pipe() + defer copyWriter.Close() - for { - packet, _, err := protocol.ReadPacket(serverConn) - if err != nil { - if utils.IsOKNetworkError(err) { - log.Debug("Server connection closed.") + go func() { + defer copyReader.Close() + + var count int64 + defer func() { + log.WithField("parsed_total", count).Debug("Stopped parsing messages from server.") + }() + + for { + _, _, err := protocol.ReadPacket(copyReader) + if err != nil { return } - log.WithError(err).Error("Failed to read server packet.") - serverErrCh <- err - return - } - msgFromServer.Inc() + count += 1 + ctr.Inc() + } + }() - _, err = protocol.WritePacket(packet, clientConn) - if err != nil { - log.WithError(err).Error("Failed to write client packet.") - serverErrCh <- err - return + // the messages are ultimately copied from e.rawServerConn to e.rawClientConn, + // but a copy of that message stream is written to a synchronous pipe, + // which is read by the analysis goroutine above. + total, err := io.Copy(clientConn, io.TeeReader(serverConn, copyWriter)) + if err != nil { + if utils.IsOKNetworkError(err) { + log.Debug("Server connection closed.") + } else { + log.WithError(err).Warn("Server -> Client copy finished with unexpected error.") } } + + log.Debugf("Stopped receiving from server. Transferred %v bytes.", total) + serverErrCh <- trace.Wrap(err) } // makeAcquireSemaphoreConfig builds parameters for acquiring a semaphore From 6f62a5154e4a1a5fc6ad19e03d9a30f341e94797 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Skrz=C4=99tnicki?= Date: Thu, 31 Aug 2023 12:01:18 +0200 Subject: [PATCH 2/3] Rename variable for ease of comprehension. --- lib/srv/db/mysql/engine.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/srv/db/mysql/engine.go b/lib/srv/db/mysql/engine.go index 71de39eebd2a6..98880785fb66b 100644 --- a/lib/srv/db/mysql/engine.go +++ b/lib/srv/db/mysql/engine.go @@ -384,7 +384,7 @@ func (e *Engine) receiveFromServer(serverConn, clientConn net.Conn, serverErrCh "client": clientConn.RemoteAddr(), "server": serverConn.RemoteAddr(), }) - ctr := common.GetMessagesFromServerMetric(sessionCtx.Database) + messagesCounter := common.GetMessagesFromServerMetric(sessionCtx.Database) // parse and count the messages from the server in a separate goroutine, // operating on a copy of the server message stream. the copy is arranged below. @@ -406,7 +406,7 @@ func (e *Engine) receiveFromServer(serverConn, clientConn net.Conn, serverErrCh } count += 1 - ctr.Inc() + messagesCounter.Inc() } }() From 578d58c94c47565063e6dd3d30f3039d691eaef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Skrz=C4=99tnicki?= Date: Thu, 31 Aug 2023 12:07:37 +0200 Subject: [PATCH 3/3] Correct variable names in the comment. --- lib/srv/db/mysql/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/srv/db/mysql/engine.go b/lib/srv/db/mysql/engine.go index 98880785fb66b..b7fdc7751c392 100644 --- a/lib/srv/db/mysql/engine.go +++ b/lib/srv/db/mysql/engine.go @@ -410,7 +410,7 @@ func (e *Engine) receiveFromServer(serverConn, clientConn net.Conn, serverErrCh } }() - // the messages are ultimately copied from e.rawServerConn to e.rawClientConn, + // the messages are ultimately copied from serverConn to clientConn, // but a copy of that message stream is written to a synchronous pipe, // which is read by the analysis goroutine above. total, err := io.Copy(clientConn, io.TeeReader(serverConn, copyWriter))