Skip to content

Commit

Permalink
fix(stats): unsubscribe from stats subscription when leaving dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
Izak88 committed Dec 13, 2017
1 parent a9ec7e5 commit 4b73041
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 50 deletions.
6 changes: 3 additions & 3 deletions e2e/140_dashboard.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ describe('Dashboard', () => {

it('should go to dashboard and see memory widget', () => {
return browser.get('/dashboard')
.then((): any => browser.wait(() => element(by.css('[name="memory-usage"]')).isPresent()))
.then(() => element(by.css('[name="memory-usage"]')).getCssValue('width'))
.then(width => expect(width).to.not.equals('0px'));
.then((): any => browser.wait(() => element(by.css('[name="memory-usage"]')).isPresent()))
.then(() => element(by.css('[name="memory-usage"]')).getCssValue('width'))
.then(width => expect(width).to.not.equals('0px'));
});
});
65 changes: 23 additions & 42 deletions src/api/docker-stats.ts
Original file line number Diff line number Diff line change
@@ -1,60 +1,41 @@
import { Observable, Observer } from 'rxjs';
import * as utils from './utils';
import * as dockerode from 'dockerode';
import { listContainers, calculateContainerStats } from './docker';
import { processes } from './process-manager';

export const docker = new dockerode();

export function getContainersStats(): Observable<any> {
return new Observable(observer => {
Observable
const sub = Observable
.interval(2000)
.timeInterval()
.mergeMap(() => {
return docker.listContainers().then(containers => {
return Promise.all(containers.map(container => {
return docker.getContainer(container.Id).stats().then((stream: any) => {
let json = '';
return new Promise(resolve => {
stream.on('data', buf => {
let rawJson = json + buf.toString();
try {
let data = JSON.parse(rawJson);

if (data && data.precpu_stats.system_cpu_usage) {
const jobId = container.Names[0].split('_')[2] || -1;
const job = processes.find(p => p.job_id === Number(jobId));
let debug = false;
if (job) {
debug = job.debug || false;
}

const stats = {
id: container.Id,
name: container.Names[0].substr(1) || '',
cpu: getCpuData(data),
network: getNetworkData(data),
memory: getMemory(data),
debug: debug
};

stream.destroy();
resolve(stats);
}
} catch (e) {
json = rawJson;
}
});
});
});
})).then(stats => stats);
});
return listContainers()
.then(containers => Promise.all(containers.map(c => getContainerStats(c))));
})
.map(stats => observer.next({ type: 'containersStats', data: stats }))
.subscribe();

return () => {
if (sub) {
sub.unsubscribe();
}
};
}).share();
}

function getContainerStats(container: any): Promise<any> {
return calculateContainerStats(container, processes).then(stats => {
return {
id: stats.id,
name: stats.name,
cpu: getCpuData(stats.data),
network: getNetworkData(stats.data),
memory: getMemory(stats.data),
debug: stats.debug
};
});
}

function getCpuData(json: any): { usage: string, cores: number } {
const postCpuStats = json.cpu_stats;
const preCpuStats = json.precpu_stats;
Expand Down
45 changes: 44 additions & 1 deletion src/api/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,12 @@ export function attachExec(id: string, cmd: any): Observable<any> {
});
}

export function listContainers(): Promise<dockerode.ContainerInfo[]> {
return docker.listContainers();
}

export function stopAllContainers(): Promise<any[]> {
return docker.listContainers()
return listContainers()
.then(containers => {
return Promise.all(containers.map(containerInfo => {
return docker.getContainer(containerInfo.Id).stop();
Expand Down Expand Up @@ -255,3 +259,42 @@ export function isDockerInstalled(): Observable<boolean> {
});
});
}

export function calculateContainerStats(
container: dockerode.ContainerInfo,
processes: any
): Promise<any> {
return docker.getContainer(container.Id).stats()
.then(stream => {
let json = '';
return new Promise((resolve, reject) => {
stream.on('data', buf => {
let rawJson = json + buf.toString();
try {
let data = JSON.parse(rawJson);

if (data && data.precpu_stats.system_cpu_usage) {
const jobId = container.Names[0].split('_')[2] || -1;
const job = processes.find(p => p.job_id === Number(jobId));
let debug = false;
if (job) {
debug = job.debug || false;
}

const stats = {
id: container.Id,
name: container.Names[0].substr(1) || '',
debug: debug,
data: data
};

stream.destroy();
resolve(stats);
}
} catch (e) {
json = rawJson;
}
});
});
});
}
17 changes: 13 additions & 4 deletions src/api/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface Client {
session: { cookie: any, ip: string, userId: number, email: string, isAdmin: boolean };
socket: uws.Socket;
send: Function;
subscriptions: { stats: Subscription };
}

export class SocketServer {
Expand Down Expand Up @@ -107,7 +108,8 @@ export class SocketServer {
sessionID: socket.upgradeReq.sessionID,
session: socket.upgradeReq.session,
socket: socket,
send: (message: any) => client.socket.send(JSON.stringify(message))
send: (message: any) => client.socket.send(JSON.stringify(message)),
subscriptions: { stats: null }
};
this.addClient(client);

Expand Down Expand Up @@ -306,9 +308,16 @@ export class SocketServer {
break;

case 'subscribeToStats':
Observable.merge(...[
memory(), cpu(), getContainersStats()
]).subscribe(event => client.send(event));
client.subscriptions.stats =
Observable.merge(...[
memory(), cpu(), getContainersStats()
]).subscribe(event => client.send(event));
break;

case 'unsubscribeFromStats':
if (client.subscriptions.stats) {
client.subscriptions.stats.unsubscribe();
}
break;
}
}
Expand Down

0 comments on commit 4b73041

Please sign in to comment.