diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts index e14f47a2b6dd..1a0ab6b19fde 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts @@ -9,6 +9,7 @@ import { tmpdir } from 'os'; import { join } from 'path'; import { InMemoryTxPool } from '../../test-helpers/testbench-utils.js'; +import { FileStoreTxSource } from '../tx_collection/file_store_tx_source.js'; import type { TxFileStoreConfig } from './config.js'; import { TxFileStore } from './tx_file_store.js'; @@ -103,7 +104,7 @@ describe('TxFileStore', () => { await txFileStore!.flush(); expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { - compress: false, + compress: true, }); spy.mockRestore(); @@ -148,7 +149,7 @@ describe('TxFileStore', () => { await txFileStore!.flush(); expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { - compress: false, + compress: true, }); spy.mockRestore(); @@ -328,4 +329,24 @@ describe('TxFileStore', () => { expect(txFileStore!.getPendingUploadCount()).toBe(0); }); }); + + describe('compression round-trip', () => { + it('uploads compressed tx and reads it back via FileStoreTxSource', async () => { + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); + txFileStore!.start(); + + const tx = await makeTx(); + await txPool.addPendingTxs([tx]); + await txFileStore!.flush(); + + // Read back via FileStoreTxSource using the same local file store + const txSource = await FileStoreTxSource.create(`file://${tmpDir}`, basePath, log); + expect(txSource).toBeDefined(); + + const results = await txSource!.getTxsByHash([tx.getTxHash()]); + expect(results).toHaveLength(1); + expect(results[0]).toBeDefined(); + expect(results[0]!.toBuffer()).toEqual(tx.toBuffer()); + }); + }); }); diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts index 672bdd6d40cc..063c6256680f 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts @@ -146,7 +146,7 @@ export class TxFileStore { } await retry( - () => this.fileStore.save(path, tx.toBuffer(), { compress: false }), + () => this.fileStore.save(path, tx.toBuffer(), { compress: true }), `Uploading tx ${txHash}`, makeBackoff([0.1, 0.5, 2]), this.log, diff --git a/yarn-project/stdlib/src/file-store/local.ts b/yarn-project/stdlib/src/file-store/local.ts index e2c3409666ba..a01ade236dff 100644 --- a/yarn-project/stdlib/src/file-store/local.ts +++ b/yarn-project/stdlib/src/file-store/local.ts @@ -1,15 +1,21 @@ import { access, mkdir, readFile, writeFile } from 'fs/promises'; import { dirname, resolve } from 'path'; +import { promisify } from 'util'; +import { gunzip as gunzipCb, gzip as gzipCb } from 'zlib'; -import type { FileStore } from './interface.js'; +import type { FileStore, FileStoreSaveOptions } from './interface.js'; + +const gzip = promisify(gzipCb); +const gunzip = promisify(gunzipCb); export class LocalFileStore implements FileStore { constructor(private readonly basePath: string) {} - public async save(path: string, data: Buffer): Promise { + public async save(path: string, data: Buffer, opts?: FileStoreSaveOptions): Promise { const fullPath = this.getFullPath(path); await mkdir(dirname(fullPath), { recursive: true }); - await writeFile(fullPath, data); + const toWrite = opts?.compress ? await gzip(data) : data; + await writeFile(fullPath, toWrite); return `file://${fullPath}`; } @@ -18,9 +24,13 @@ export class LocalFileStore implements FileStore { return this.save(destPath, data); } - public read(pathOrUrlStr: string): Promise { + public async read(pathOrUrlStr: string): Promise { const fullPath = this.getFullPath(pathOrUrlStr); - return readFile(fullPath); + const data = await readFile(fullPath); + if (data.length >= 2 && data[0] === 0x1f && data[1] === 0x8b) { + return await gunzip(data); + } + return data; } public async download(pathOrUrlStr: string, destPath: string): Promise { diff --git a/yarn-project/stdlib/src/file-store/s3.ts b/yarn-project/stdlib/src/file-store/s3.ts index 2033a1e86d9d..22d41347b4bc 100644 --- a/yarn-project/stdlib/src/file-store/s3.ts +++ b/yarn-project/stdlib/src/file-store/s3.ts @@ -13,10 +13,14 @@ import { tmpdir } from 'os'; import { basename, dirname, join } from 'path'; import { Readable } from 'stream'; import { pipeline } from 'stream/promises'; -import { createGzip } from 'zlib'; +import { promisify } from 'util'; +import { createGzip, gunzip as gunzipCb, gzip as gzipCb } from 'zlib'; import type { FileStore, FileStoreSaveOptions } from './interface.js'; +const gzip = promisify(gzipCb); +const gunzip = promisify(gunzipCb); + function normalizeBasePath(path: string): string { return path?.replace(/^\/+|\/+$/g, '') ?? ''; } @@ -52,7 +56,7 @@ export class S3FileStore implements FileStore { const key = this.getFullPath(path); const shouldCompress = !!opts.compress; - const body = shouldCompress ? (await import('zlib')).gzipSync(data) : data; + const body = shouldCompress ? await gzip(data) : data; const contentLength = body.length; const contentType = this.detectContentType(key, shouldCompress); const put = new PutObjectCommand({ @@ -60,6 +64,7 @@ export class S3FileStore implements FileStore { Key: key, Body: body, ContentType: contentType, + ContentEncoding: shouldCompress ? 'gzip' : undefined, CacheControl: opts.metadata?.['Cache-control'], Metadata: this.extractUserMetadata(opts.metadata), ContentLength: contentLength, @@ -134,7 +139,11 @@ export class S3FileStore implements FileStore { for await (const chunk of stream) { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); } - return Buffer.concat(chunks); + const buffer = Buffer.concat(chunks); + if (out.ContentEncoding === 'gzip') { + return await gunzip(buffer); + } + return buffer; } public async download(pathOrUrlStr: string, destPath: string): Promise {