From 17a30c9633bed4fd0a3fa9b9315585583a60b7a5 Mon Sep 17 00:00:00 2001 From: Valentinas Janeiko Date: Tue, 21 Jan 2025 19:09:21 +0000 Subject: [PATCH 1/3] test: Add a test to reproduce the issue --- tests/index.js | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/index.js b/tests/index.js index bf81b036..b9b819a0 100644 --- a/tests/index.js +++ b/tests/index.js @@ -1914,6 +1914,45 @@ t('Copy read', async() => { ] }) +t('Copy read with back-pressure', async() => { + await sql`create table test (x int)` + + // Make sure there are enough rows in the table to fill the buffer + // so that `CopyDone` message is handled while the socket is paused + await sql`insert into test select * from generate_series(1,12774)` + + let result = 0 + const readable = await sql`copy test to stdout`.readable() + readable.on('data', _ => result++) + + // Pause the stream so that the entire buffer fills up + readable.pause() + + await Promise.all([ + // Wait until the stream has been consumed + new Promise(r => readable.on('end', r)), + (async() => { + // Wait until the entire buffer fills up, + await new Promise(r => readable.on('readable', () => { + if (readable.readableBuffer.length === 12774) + r() + })) + // Switch the stream back to flowing mode (allowing it to be consumed) + readable.removeAllListeners('readable') + })() + ]) + + // This is the actual test, the copy stream is done + // we should be able to run a new query + await sql`SELECT 1` + + return [ + result, + 12774, + await sql`drop table test` + ] +}) + t('Copy write', { timeout: 2 }, async() => { await sql`create table test (x int)` const writable = await sql`copy test from stdin`.writable() From 77c820e964bc40eb43f40a2fca841d516ced0ed9 Mon Sep 17 00:00:00 2001 From: Valentinas Janeiko Date: Tue, 21 Jan 2025 19:10:32 +0000 Subject: [PATCH 2/3] fix: Ensure socket is unpaused after COPY stream is complete --- src/connection.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connection.js b/src/connection.js index 97cc97e1..d98dded1 100644 --- a/src/connection.js +++ b/src/connection.js @@ -884,6 +884,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose function CopyDone() { stream && stream.push(null) stream = null + socket.isPaused() && socket.resume() } function NoticeResponse(x) { From 2797ff49c754e8d81ce2cc55eb5d8e7cec7ee68a Mon Sep 17 00:00:00 2001 From: Valentinas Janeiko Date: Thu, 23 Jan 2025 21:25:05 +0000 Subject: [PATCH 3/3] Compute test table size based on default high water mark --- tests/index.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/index.js b/tests/index.js index b9b819a0..0c175911 100644 --- a/tests/index.js +++ b/tests/index.js @@ -4,6 +4,7 @@ import { t, nt, ot } from './test.js' // eslint-disable-line import net from 'net' import fs from 'fs' import crypto from 'crypto' +import stream from 'stream' import postgres from '../src/index.js' const delay = ms => new Promise(r => setTimeout(r, ms)) @@ -1919,11 +1920,12 @@ t('Copy read with back-pressure', async() => { // Make sure there are enough rows in the table to fill the buffer // so that `CopyDone` message is handled while the socket is paused - await sql`insert into test select * from generate_series(1,12774)` + const bufferSize = Math.ceil((stream.getDefaultHighWaterMark || (() => 16384))() / 6) + await sql`insert into test select * from generate_series(10000,${9999 + bufferSize})` let result = 0 const readable = await sql`copy test to stdout`.readable() - readable.on('data', _ => result++) + readable.on('data', () => result++) // Pause the stream so that the entire buffer fills up readable.pause() @@ -1934,7 +1936,7 @@ t('Copy read with back-pressure', async() => { (async() => { // Wait until the entire buffer fills up, await new Promise(r => readable.on('readable', () => { - if (readable.readableBuffer.length === 12774) + if (readable.readableBuffer.length === bufferSize) r() })) // Switch the stream back to flowing mode (allowing it to be consumed) @@ -1948,7 +1950,7 @@ t('Copy read with back-pressure', async() => { return [ result, - 12774, + bufferSize, await sql`drop table test` ] })