Skip to content

Commit

Permalink
fix(streams): custom demux stream func
Browse files Browse the repository at this point in the history
  • Loading branch information
jkuri committed Aug 3, 2018
1 parent 872d82f commit 12c0956
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/api/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as envVars from './env-variables';
import * as style from 'ansi-styles';
import { platform } from 'os';
import * as commandExists from 'command-exists';
import { demuxStream } from './utils';

export const docker = new dockerode();
const binds = platform() === 'darwin' ? [] : ['/var/run/docker.sock:/var/run/docker.sock'];
Expand Down Expand Up @@ -142,7 +143,7 @@ export function dockerExec(
next();
};

container.modem.demuxStream(stream.output, ws, ws);
demuxStream(stream.output, ws);
stream.output.on('end', () => ws.end());
})
.catch(err => observer.error(err));
Expand Down
5 changes: 2 additions & 3 deletions src/api/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { existsSync, copyFile, ensureDirectory } from './fs';
import { readFileSync, writeFileSync } from 'fs';
import { ensureDirSync, statSync, remove, readJson, writeJson } from 'fs-extra';
import * as uuid from 'uuid';
import * as temp from 'temp';
import * as glob from 'glob';
import { homedir } from 'os';
import { getHumanSize } from './utils';
Expand Down Expand Up @@ -130,12 +129,12 @@ export function getCacheFilesFromPattern(pattern: string): any[] {
}

export function deleteCacheFilesFromPattern(pattern): Promise<void> {
return new Promise((resolve, reject) => {
return new Promise((res, reject) => {
let cacheFolder = getFilePath('cache');
let search = glob.sync(join(cacheFolder, pattern));

Promise.all(search.map(result => remove(result)))
.then(() => resolve())
.then(() => res())
.catch(err => reject(err));
});
}
Expand Down
25 changes: 25 additions & 0 deletions src/api/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { readFileSync } from 'fs';
import * as request from 'request';
import { CommandType, CommandTypePriority } from './config';
import { Observable } from 'rxjs';
import { Readable, Writable } from 'stream';


export interface JobProcess {
Expand Down Expand Up @@ -139,3 +140,27 @@ export function prepareCommands(proc: JobProcess, allowed: CommandType[]): any {
return proc.commands.indexOf(a) - proc.commands.indexOf(b);
});
}

export function demuxStream(source: Readable, destination: Writable): void {
let header = null;

source
.on('readable', () => {
header = header || source.read(8);
while (header) {
const payload = source.read(header.readUInt32BE(4));
if (!payload) {
break;
}

try {
destination.write(payload);
} catch (e) {
console.log(e);
break;
}
header = source.read(8);
}
})
.on('end', () => destination.end());
}

0 comments on commit 12c0956

Please sign in to comment.