Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
"@kbn/test-subj-selector": "0.2.1",
"@kbn/ui-framework": "1.0.0",
"@kbn/ui-shared-deps": "1.0.0",
"@types/yauzl": "^2.9.1",
"JSONStream": "1.3.5",
"abortcontroller-polyfill": "^1.4.0",
"accept": "3.0.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { pkgToPkgKey } from './index';

const cache: Map<string, Buffer> = new Map();
export const cacheGet = (key: string) => cache.get(key);
export const cacheSet = (key: string, value: Buffer) => cache.set(key, value);
export const cacheHas = (key: string) => cache.has(key);
export const getCacheKey = (key: string) => key + '.tar.gz';
export const cacheClear = () => cache.clear();
export const cacheDelete = (key: string) => cache.delete(key);

const archiveLocationCache: Map<string, string> = new Map();
export const getArchiveLocation = (name: string, version: string) =>
archiveLocationCache.get(pkgToPkgKey({ name, version }));

export const setArchiveLocation = (name: string, version: string, location: string) =>
archiveLocationCache.set(pkgToPkgKey({ name, version }), location);
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import tar from 'tar';
import yauzl from 'yauzl';
import { bufferToStream, streamToBuffer } from './streams';

export interface ArchiveEntry {
Expand All @@ -30,3 +31,40 @@ export async function untarBuffer(
deflatedStream.pipe(inflateStream);
});
}

export async function unzipBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
): Promise<void> {
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
zipfile.readEntry();
zipfile.on('entry', async (entry: yauzl.Entry) => {
const path = entry.fileName;
if (!filter({ path })) return zipfile.readEntry();

const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
onEntry({ buffer: entryBuffer, path });
zipfile.readEntry();
});
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
}

function yauzlFromBuffer(buffer: Buffer, opts: yauzl.Options): Promise<yauzl.ZipFile> {
return new Promise((resolve, reject) =>
yauzl.fromBuffer(buffer, opts, (err?: Error, handle?: yauzl.ZipFile) =>
err ? reject(err) : resolve(handle)
)
);
}

function getZipReadStream(
zipfile: yauzl.ZipFile,
entry: yauzl.Entry
): Promise<NodeJS.ReadableStream> {
return new Promise((resolve, reject) =>
zipfile.openReadStream(entry, (err?: Error, readStream?: NodeJS.ReadableStream) =>
err ? reject(err) : resolve(readStream)
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@
*/

import { AssetParts } from '../../../types';
import { pathParts, splitPkgKey } from './index';
import { getBufferExtractor, pathParts, splitPkgKey } from './index';
import { getArchiveLocation } from './cache';
import { untarBuffer, unzipBuffer } from './extract';

jest.mock('./cache', () => {
return {
getArchiveLocation: jest.fn(),
};
});

const mockedGetArchiveLocation = getArchiveLocation as jest.Mock;

const testPaths = [
{
Expand Down Expand Up @@ -80,3 +90,21 @@ describe('splitPkgKey tests', () => {
expect(pkgVersion).toBe('0.13.0-alpha.1+abcd');
});
});

describe('getBufferExtractor', () => {
it('throws if the archive has not been downloaded/cached yet', () => {
expect(() => getBufferExtractor('missing', '1.2.3')).toThrow('no archive location');
});

it('returns unzipBuffer if the archive key ends in .zip', () => {
mockedGetArchiveLocation.mockImplementation(() => '.zip');
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
expect(extractor).toBe(unzipBuffer);
});

it('returns untarBuffer if the key ends in anything else', () => {
mockedGetArchiveLocation.mockImplementation(() => 'xyz');
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
expect(extractor).toBe(untarBuffer);
});
});
45 changes: 23 additions & 22 deletions x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {
RegistrySearchResults,
RegistrySearchResult,
} from '../../../types';
import { cacheGet, cacheSet, getCacheKey, cacheHas } from './cache';
import { ArchiveEntry, untarBuffer } from './extract';
import { cacheGet, cacheSet, cacheHas, getArchiveLocation, setArchiveLocation } from './cache';
import { ArchiveEntry, untarBuffer, unzipBuffer } from './extract';
import { fetchUrl, getResponse, getResponseStream } from './requests';
import { streamToBuffer } from './streams';
import { getRegistryUrl } from './registry_url';
Expand Down Expand Up @@ -130,17 +130,17 @@ export async function getArchiveInfo(
filter = (entry: ArchiveEntry): boolean => true
): Promise<string[]> {
const paths: string[] = [];
const onEntry = (entry: ArchiveEntry) => {
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
const bufferExtractor = getBufferExtractor(pkgName, pkgVersion);
await bufferExtractor(archiveBuffer, filter, (entry: ArchiveEntry) => {
const { path, buffer } = entry;
const { file } = pathParts(path);
if (!file) return;
if (buffer) {
cacheSet(path, buffer);
paths.push(path);
}
};

await extract(pkgName, pkgVersion, filter, onEntry);
});

return paths;
}
Expand Down Expand Up @@ -175,24 +175,20 @@ export function pathParts(path: string): AssetParts {
} as AssetParts;
}

async function extract(
pkgName: string,
pkgVersion: string,
filter = (entry: ArchiveEntry): boolean => true,
onEntry: (entry: ArchiveEntry) => void
) {
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
export function getBufferExtractor(pkgName: string, pkgVersion: string) {
const archiveLocation = getArchiveLocation(pkgName, pkgVersion);
if (!archiveLocation) throw new Error(`no archive location for ${pkgName} ${pkgVersion}`);
const isZip = archiveLocation.endsWith('.zip');
const bufferExtractor = isZip ? unzipBuffer : untarBuffer;

return untarBuffer(archiveBuffer, filter, onEntry);
return bufferExtractor;
}

async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
// assume .tar.gz for now. add support for .zip if/when we need it
const key = getCacheKey(`${pkgName}-${pkgVersion}`);
let buffer = cacheGet(key);
const key = getArchiveLocation(pkgName, pkgVersion);
let buffer = key && cacheGet(key);
if (!buffer) {
buffer = await fetchArchiveBuffer(pkgName, pkgVersion);
cacheSet(key, buffer);
}

if (buffer) {
Expand All @@ -203,16 +199,21 @@ async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Pro
}

export async function ensureCachedArchiveInfo(name: string, version: string) {
const pkgkey = getCacheKey(`${name}-${version}`);
if (!cacheHas(pkgkey)) {
const pkgkey = getArchiveLocation(name, version);
if (!pkgkey || !cacheHas(pkgkey)) {
await getArchiveInfo(name, version);
}
}

async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
const { download: archivePath } = await fetchInfo(pkgName, pkgVersion);
const registryUrl = getRegistryUrl();
return getResponseStream(`${registryUrl}${archivePath}`).then(streamToBuffer);
const archiveUrl = `${getRegistryUrl()}${archivePath}`;
const buffer = await getResponseStream(archiveUrl).then(streamToBuffer);

setArchiveLocation(pkgName, pkgVersion, archivePath);
cacheSet(archivePath, buffer);

return buffer;
}

export function getAsset(key: string) {
Expand Down