From 6e3ef42eb769e8cbcb767edccd9c81eafbcce8e8 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Tue, 17 Feb 2026 17:08:35 +0000 Subject: [PATCH] fix(p2p): fix compress option in file store and enable for tx uploads S3 save was gzipping but not setting ContentEncoding, so reads returned raw gzip bytes. Local file store ignored the compress option entirely. Fix both backends to round-trip correctly and flip tx uploads to use compression. Co-Authored-By: Claude Opus 4.6 --- .../tx_file_store/tx_file_store.test.ts | 25 +++++++++++++++++-- .../services/tx_file_store/tx_file_store.ts | 2 +- yarn-project/stdlib/src/file-store/local.ts | 20 +++++++++++---- yarn-project/stdlib/src/file-store/s3.ts | 15 ++++++++--- 4 files changed, 51 insertions(+), 11 deletions(-) diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts index e14f47a2b6dd..1a0ab6b19fde 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts @@ -9,6 +9,7 @@ import { tmpdir } from 'os'; import { join } from 'path'; import { InMemoryTxPool } from '../../test-helpers/testbench-utils.js'; +import { FileStoreTxSource } from '../tx_collection/file_store_tx_source.js'; import type { TxFileStoreConfig } from './config.js'; import { TxFileStore } from './tx_file_store.js'; @@ -103,7 +104,7 @@ describe('TxFileStore', () => { await txFileStore!.flush(); expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { - compress: false, + compress: true, }); spy.mockRestore(); @@ -148,7 +149,7 @@ describe('TxFileStore', () => { await txFileStore!.flush(); expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { - compress: false, + compress: true, }); spy.mockRestore(); @@ -328,4 +329,24 @@ describe('TxFileStore', () => { expect(txFileStore!.getPendingUploadCount()).toBe(0); }); }); + + describe('compression round-trip', () => { + it('uploads compressed tx and reads it back via FileStoreTxSource', async () => { + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); + txFileStore!.start(); + + const tx = await makeTx(); + await txPool.addPendingTxs([tx]); + await txFileStore!.flush(); + + // Read back via FileStoreTxSource using the same local file store + const txSource = await FileStoreTxSource.create(`file://${tmpDir}`, basePath, log); + expect(txSource).toBeDefined(); + + const results = await txSource!.getTxsByHash([tx.getTxHash()]); + expect(results).toHaveLength(1); + expect(results[0]).toBeDefined(); + expect(results[0]!.toBuffer()).toEqual(tx.toBuffer()); + }); + }); }); diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts index 672bdd6d40cc..063c6256680f 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts @@ -146,7 +146,7 @@ export class TxFileStore { } await retry( - () => this.fileStore.save(path, tx.toBuffer(), { compress: false }), + () => this.fileStore.save(path, tx.toBuffer(), { compress: true }), `Uploading tx ${txHash}`, makeBackoff([0.1, 0.5, 2]), this.log, diff --git a/yarn-project/stdlib/src/file-store/local.ts b/yarn-project/stdlib/src/file-store/local.ts index e2c3409666ba..a01ade236dff 100644 --- a/yarn-project/stdlib/src/file-store/local.ts +++ b/yarn-project/stdlib/src/file-store/local.ts @@ -1,15 +1,21 @@ import { access, mkdir, readFile, writeFile } from 'fs/promises'; import { dirname, resolve } from 'path'; +import { promisify } from 'util'; +import { gunzip as gunzipCb, gzip as gzipCb } from 'zlib'; -import type { FileStore } from './interface.js'; +import type { FileStore, FileStoreSaveOptions } from './interface.js'; + +const gzip = promisify(gzipCb); +const gunzip = promisify(gunzipCb); export class LocalFileStore implements FileStore { constructor(private readonly basePath: string) {} - public async save(path: string, data: Buffer): Promise { + public async save(path: string, data: Buffer, opts?: FileStoreSaveOptions): Promise { const fullPath = this.getFullPath(path); await mkdir(dirname(fullPath), { recursive: true }); - await writeFile(fullPath, data); + const toWrite = opts?.compress ? await gzip(data) : data; + await writeFile(fullPath, toWrite); return `file://${fullPath}`; } @@ -18,9 +24,13 @@ export class LocalFileStore implements FileStore { return this.save(destPath, data); } - public read(pathOrUrlStr: string): Promise { + public async read(pathOrUrlStr: string): Promise { const fullPath = this.getFullPath(pathOrUrlStr); - return readFile(fullPath); + const data = await readFile(fullPath); + if (data.length >= 2 && data[0] === 0x1f && data[1] === 0x8b) { + return await gunzip(data); + } + return data; } public async download(pathOrUrlStr: string, destPath: string): Promise { diff --git a/yarn-project/stdlib/src/file-store/s3.ts b/yarn-project/stdlib/src/file-store/s3.ts index 2033a1e86d9d..22d41347b4bc 100644 --- a/yarn-project/stdlib/src/file-store/s3.ts +++ b/yarn-project/stdlib/src/file-store/s3.ts @@ -13,10 +13,14 @@ import { tmpdir } from 'os'; import { basename, dirname, join } from 'path'; import { Readable } from 'stream'; import { pipeline } from 'stream/promises'; -import { createGzip } from 'zlib'; +import { promisify } from 'util'; +import { createGzip, gunzip as gunzipCb, gzip as gzipCb } from 'zlib'; import type { FileStore, FileStoreSaveOptions } from './interface.js'; +const gzip = promisify(gzipCb); +const gunzip = promisify(gunzipCb); + function normalizeBasePath(path: string): string { return path?.replace(/^\/+|\/+$/g, '') ?? ''; } @@ -52,7 +56,7 @@ export class S3FileStore implements FileStore { const key = this.getFullPath(path); const shouldCompress = !!opts.compress; - const body = shouldCompress ? (await import('zlib')).gzipSync(data) : data; + const body = shouldCompress ? await gzip(data) : data; const contentLength = body.length; const contentType = this.detectContentType(key, shouldCompress); const put = new PutObjectCommand({ @@ -60,6 +64,7 @@ export class S3FileStore implements FileStore { Key: key, Body: body, ContentType: contentType, + ContentEncoding: shouldCompress ? 'gzip' : undefined, CacheControl: opts.metadata?.['Cache-control'], Metadata: this.extractUserMetadata(opts.metadata), ContentLength: contentLength, @@ -134,7 +139,11 @@ export class S3FileStore implements FileStore { for await (const chunk of stream) { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); } - return Buffer.concat(chunks); + const buffer = Buffer.concat(chunks); + if (out.ContentEncoding === 'gzip') { + return await gunzip(buffer); + } + return buffer; } public async download(pathOrUrlStr: string, destPath: string): Promise {