Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { tmpdir } from 'os';
import { join } from 'path';

import { InMemoryTxPool } from '../../test-helpers/testbench-utils.js';
import { FileStoreTxSource } from '../tx_collection/file_store_tx_source.js';
import type { TxFileStoreConfig } from './config.js';
import { TxFileStore } from './tx_file_store.js';

Expand Down Expand Up @@ -103,7 +104,7 @@ describe('TxFileStore', () => {
await txFileStore!.flush();

expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), {
compress: false,
compress: true,
});

spy.mockRestore();
Expand Down Expand Up @@ -148,7 +149,7 @@ describe('TxFileStore', () => {
await txFileStore!.flush();

expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), {
compress: false,
compress: true,
});

spy.mockRestore();
Expand Down Expand Up @@ -328,4 +329,24 @@ describe('TxFileStore', () => {
expect(txFileStore!.getPendingUploadCount()).toBe(0);
});
});

describe('compression round-trip', () => {
it('uploads compressed tx and reads it back via FileStoreTxSource', async () => {
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const tx = await makeTx();
await txPool.addPendingTxs([tx]);
await txFileStore!.flush();

// Read back via FileStoreTxSource using the same local file store
const txSource = await FileStoreTxSource.create(`file://${tmpDir}`, basePath, log);
expect(txSource).toBeDefined();

const results = await txSource!.getTxsByHash([tx.getTxHash()]);
expect(results).toHaveLength(1);
expect(results[0]).toBeDefined();
expect(results[0]!.toBuffer()).toEqual(tx.toBuffer());
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export class TxFileStore {
}

await retry(
() => this.fileStore.save(path, tx.toBuffer(), { compress: false }),
() => this.fileStore.save(path, tx.toBuffer(), { compress: true }),
`Uploading tx ${txHash}`,
makeBackoff([0.1, 0.5, 2]),
this.log,
Expand Down
20 changes: 15 additions & 5 deletions yarn-project/stdlib/src/file-store/local.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { access, mkdir, readFile, writeFile } from 'fs/promises';
import { dirname, resolve } from 'path';
import { promisify } from 'util';
import { gunzip as gunzipCb, gzip as gzipCb } from 'zlib';

import type { FileStore } from './interface.js';
import type { FileStore, FileStoreSaveOptions } from './interface.js';

const gzip = promisify(gzipCb);
const gunzip = promisify(gunzipCb);

export class LocalFileStore implements FileStore {
constructor(private readonly basePath: string) {}

public async save(path: string, data: Buffer): Promise<string> {
public async save(path: string, data: Buffer, opts?: FileStoreSaveOptions): Promise<string> {
const fullPath = this.getFullPath(path);
await mkdir(dirname(fullPath), { recursive: true });
await writeFile(fullPath, data);
const toWrite = opts?.compress ? await gzip(data) : data;
await writeFile(fullPath, toWrite);
return `file://${fullPath}`;
}

Expand All @@ -18,9 +24,13 @@ export class LocalFileStore implements FileStore {
return this.save(destPath, data);
}

public read(pathOrUrlStr: string): Promise<Buffer> {
public async read(pathOrUrlStr: string): Promise<Buffer> {
const fullPath = this.getFullPath(pathOrUrlStr);
return readFile(fullPath);
const data = await readFile(fullPath);
if (data.length >= 2 && data[0] === 0x1f && data[1] === 0x8b) {
return await gunzip(data);
}
return data;
}

public async download(pathOrUrlStr: string, destPath: string): Promise<void> {
Expand Down
15 changes: 12 additions & 3 deletions yarn-project/stdlib/src/file-store/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ import { tmpdir } from 'os';
import { basename, dirname, join } from 'path';
import { Readable } from 'stream';
import { pipeline } from 'stream/promises';
import { createGzip } from 'zlib';
import { promisify } from 'util';
import { createGzip, gunzip as gunzipCb, gzip as gzipCb } from 'zlib';

import type { FileStore, FileStoreSaveOptions } from './interface.js';

const gzip = promisify(gzipCb);
const gunzip = promisify(gunzipCb);

function normalizeBasePath(path: string): string {
return path?.replace(/^\/+|\/+$/g, '') ?? '';
}
Expand Down Expand Up @@ -52,14 +56,15 @@ export class S3FileStore implements FileStore {
const key = this.getFullPath(path);
const shouldCompress = !!opts.compress;

const body = shouldCompress ? (await import('zlib')).gzipSync(data) : data;
const body = shouldCompress ? await gzip(data) : data;
const contentLength = body.length;
const contentType = this.detectContentType(key, shouldCompress);
const put = new PutObjectCommand({
Bucket: this.bucketName,
Key: key,
Body: body,
ContentType: contentType,
ContentEncoding: shouldCompress ? 'gzip' : undefined,
CacheControl: opts.metadata?.['Cache-control'],
Metadata: this.extractUserMetadata(opts.metadata),
ContentLength: contentLength,
Expand Down Expand Up @@ -134,7 +139,11 @@ export class S3FileStore implements FileStore {
for await (const chunk of stream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
return Buffer.concat(chunks);
const buffer = Buffer.concat(chunks);
if (out.ContentEncoding === 'gzip') {
return await gunzip(buffer);
}
return buffer;
}

public async download(pathOrUrlStr: string, destPath: string): Promise<void> {
Expand Down
Loading