diff --git a/src/connection.js b/src/connection.js index 97cc97e1..d98dded1 100644 --- a/src/connection.js +++ b/src/connection.js @@ -884,6 +884,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose function CopyDone() { stream && stream.push(null) stream = null + socket.isPaused() && socket.resume() } function NoticeResponse(x) { diff --git a/tests/index.js b/tests/index.js index bf81b036..0c175911 100644 --- a/tests/index.js +++ b/tests/index.js @@ -4,6 +4,7 @@ import { t, nt, ot } from './test.js' // eslint-disable-line import net from 'net' import fs from 'fs' import crypto from 'crypto' +import stream from 'stream' import postgres from '../src/index.js' const delay = ms => new Promise(r => setTimeout(r, ms)) @@ -1914,6 +1915,46 @@ t('Copy read', async() => { ] }) +t('Copy read with back-pressure', async() => { + await sql`create table test (x int)` + + // Make sure there are enough rows in the table to fill the buffer + // so that `CopyDone` message is handled while the socket is paused + const bufferSize = Math.ceil((stream.getDefaultHighWaterMark || (() => 16384))() / 6) + await sql`insert into test select * from generate_series(10000,${9999 + bufferSize})` + + let result = 0 + const readable = await sql`copy test to stdout`.readable() + readable.on('data', () => result++) + + // Pause the stream so that the entire buffer fills up + readable.pause() + + await Promise.all([ + // Wait until the stream has been consumed + new Promise(r => readable.on('end', r)), + (async() => { + // Wait until the entire buffer fills up, + await new Promise(r => readable.on('readable', () => { + if (readable.readableBuffer.length === bufferSize) + r() + })) + // Switch the stream back to flowing mode (allowing it to be consumed) + readable.removeAllListeners('readable') + })() + ]) + + // This is the actual test, the copy stream is done + // we should be able to run a new query + await sql`SELECT 1` + + return [ + result, + bufferSize, + await sql`drop table test` + ] +}) + t('Copy write', { timeout: 2 }, async() => { await sql`create table test (x int)` const writable = await sql`copy test from stdin`.writable()