Skip to content

feat(server): add tunnel time metric to opt-in server usage report #1551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 18, 2024
8 changes: 6 additions & 2 deletions src/shadowbox/infrastructure/prometheus_scraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import * as path from 'path';

import * as logging from '../infrastructure/logging';

export interface QueryResultMetric {
[labelValue: string]: string;
}

export interface QueryResultData {
resultType: 'matrix' | 'vector' | 'scalar' | 'string';
result: Array<{
metric: {[labelValue: string]: string};
metric: QueryResultMetric;
value: [number, string];
}>;
}
Expand Down Expand Up @@ -101,7 +105,7 @@ async function spawnPrometheusSubprocess(
prometheusEndpoint: string
): Promise<child_process.ChildProcess> {
logging.info('======== Starting Prometheus ========');
logging.info(`${binaryFilename} ${processArgs.map(a => `"${a}"`).join(' ')}`);
logging.info(`${binaryFilename} ${processArgs.map((a) => `"${a}"`).join(' ')}`);
const runProcess = child_process.spawn(binaryFilename, processArgs);
runProcess.on('error', (error) => {
logging.error(`Error spawning Prometheus: ${error}`);
Expand Down
2 changes: 1 addition & 1 deletion src/shadowbox/server/mocks/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class FakePrometheusClient extends PrometheusClient {
const bytesTransferred = this.bytesTransferredById[accessKeyId] || 0;
queryResultData.result.push({
metric: {access_key: accessKeyId},
value: [bytesTransferred, `${bytesTransferred}`],
value: [Date.now() / 1000, `${bytesTransferred}`],
});
}
return queryResultData;
Expand Down
91 changes: 60 additions & 31 deletions src/shadowbox/server/shared_metrics.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {AccessKeyConfigJson} from './server_access_key';

import {ServerConfigJson} from './server_config';
import {
CountryUsage,
LocationUsage,
DailyFeatureMetricsReportJson,
HourlyServerMetricsReportJson,
MetricsCollectorClient,
Expand Down Expand Up @@ -78,12 +78,12 @@ describe('OutlineSharedMetricsPublisher', () => {
);

publisher.startSharing();
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'BB', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
usageMetrics.locationUsage = [
{country: 'AA', inboundBytes: 11, tunnelTimeSec: 99},
{country: 'BB', inboundBytes: 11, tunnelTimeSec: 88},
{country: 'CC', inboundBytes: 22, tunnelTimeSec: 77},
{country: 'AA', inboundBytes: 33, tunnelTimeSec: 66},
{country: 'DD', inboundBytes: 33, tunnelTimeSec: 55},
];

clock.nowMs += 60 * 60 * 1000;
Expand All @@ -93,18 +93,18 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 11, countries: ['BB']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
{bytesTransferred: 11, countries: ['AA'], tunnelTimeSec: 99},
{bytesTransferred: 11, countries: ['BB'], tunnelTimeSec: 88},
{bytesTransferred: 22, countries: ['CC'], tunnelTimeSec: 77},
{bytesTransferred: 33, countries: ['AA'], tunnelTimeSec: 66},
{bytesTransferred: 33, countries: ['DD'], tunnelTimeSec: 55},
],
});

startTime = clock.nowMs;
usageMetrics.countryUsage = [
{country: 'EE', inboundBytes: 44},
{country: 'FF', inboundBytes: 55},
usageMetrics.locationUsage = [
{country: 'EE', inboundBytes: 44, tunnelTimeSec: 11},
{country: 'FF', inboundBytes: 55, tunnelTimeSec: 22},
];

clock.nowMs += 60 * 60 * 1000;
Expand All @@ -114,13 +114,42 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{bytesTransferred: 44, countries: ['EE']},
{bytesTransferred: 55, countries: ['FF']},
{bytesTransferred: 44, countries: ['EE'], tunnelTimeSec: 11},
{bytesTransferred: 55, countries: ['FF'], tunnelTimeSec: 22},
],
});

publisher.stopSharing();
});

it('reports ASN metrics correctly', async () => {
const clock = new ManualClock();
const serverConfig = new InMemoryConfig<ServerConfigJson>({serverId: 'server-id'});
const usageMetrics = new ManualUsageMetrics();
const metricsCollector = new FakeMetricsCollector();
const publisher = new OutlineSharedMetricsPublisher(
clock,
serverConfig,
null,
usageMetrics,
metricsCollector
);
publisher.startSharing();

usageMetrics.locationUsage = [
{country: 'DD', inboundBytes: 44, tunnelTimeSec: 11, asn: 999},
{country: 'EE', inboundBytes: 55, tunnelTimeSec: 22},
];
clock.nowMs += 60 * 60 * 1000;
await clock.runCallbacks();

expect(metricsCollector.collectedServerUsageReport.userReports).toEqual([
{bytesTransferred: 44, tunnelTimeSec: 11, countries: ['DD'], asn: 999},
{bytesTransferred: 55, tunnelTimeSec: 22, countries: ['EE']},
]);
publisher.stopSharing();
});

it('ignores sanctioned countries', async () => {
const clock = new ManualClock();
const startTime = clock.nowMs;
Expand All @@ -136,12 +165,12 @@ describe('OutlineSharedMetricsPublisher', () => {
);

publisher.startSharing();
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'SY', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
usageMetrics.locationUsage = [
{country: 'AA', tunnelTimeSec: 99, inboundBytes: 11},
{country: 'SY', tunnelTimeSec: 88, inboundBytes: 11},
{country: 'CC', tunnelTimeSec: 77, inboundBytes: 22},
{country: 'AA', tunnelTimeSec: 66, inboundBytes: 33},
{country: 'DD', tunnelTimeSec: 55, inboundBytes: 33},
];

clock.nowMs += 60 * 60 * 1000;
Expand All @@ -151,10 +180,10 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
{bytesTransferred: 11, tunnelTimeSec: 99, countries: ['AA']},
{bytesTransferred: 22, tunnelTimeSec: 77, countries: ['CC']},
{bytesTransferred: 33, tunnelTimeSec: 66, countries: ['AA']},
{bytesTransferred: 33, tunnelTimeSec: 55, countries: ['DD']},
],
});
publisher.stopSharing();
Expand Down Expand Up @@ -257,13 +286,13 @@ class FakeMetricsCollector implements MetricsCollectorClient {
}

class ManualUsageMetrics implements UsageMetrics {
countryUsage = [] as CountryUsage[];
locationUsage = [] as LocationUsage[];

getCountryUsage(): Promise<CountryUsage[]> {
return Promise.resolve(this.countryUsage);
getLocationUsage(): Promise<LocationUsage[]> {
return Promise.resolve(this.locationUsage);
}

reset() {
this.countryUsage = [] as CountryUsage[];
this.locationUsage = [] as LocationUsage[];
}
}
111 changes: 87 additions & 24 deletions src/shadowbox/server/shared_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {Clock} from '../infrastructure/clock';
import * as follow_redirects from '../infrastructure/follow_redirects';
import {JsonConfig} from '../infrastructure/json_config';
import * as logging from '../infrastructure/logging';
import {PrometheusClient} from '../infrastructure/prometheus_scraper';
import {PrometheusClient, QueryResultMetric} from '../infrastructure/prometheus_scraper';
import * as version from './version';
import {AccessKeyConfigJson} from './server_access_key';

Expand All @@ -26,9 +26,21 @@ const MS_PER_HOUR = 60 * 60 * 1000;
const MS_PER_DAY = 24 * MS_PER_HOUR;
const SANCTIONED_COUNTRIES = new Set(['CU', 'KP', 'SY']);

export interface CountryUsage {
const PROMETHEUS_COUNTRY_LABEL = 'location';
const PROMETHEUS_ASN_LABEL = 'asn';

type PrometheusQueryResult = {
[metricKey: string]: {
metric: QueryResultMetric;
value: number;
};
};

export interface LocationUsage {
country: string;
asn?: number;
inboundBytes: number;
tunnelTimeSec: number;
}

// JSON format for the published report.
Expand All @@ -44,7 +56,9 @@ export interface HourlyServerMetricsReportJson {
// Field renames will break backwards-compatibility.
export interface HourlyUserMetricsReportJson {
countries: string[];
asn?: number;
bytesTransferred: number;
tunnelTimeSec: number;
}

// JSON format for the feature metrics report.
Expand All @@ -70,7 +84,7 @@ export interface SharedMetricsPublisher {
}

export interface UsageMetrics {
getCountryUsage(): Promise<CountryUsage[]>;
getLocationUsage(): Promise<LocationUsage[]>;
reset();
}

Expand All @@ -80,17 +94,61 @@ export class PrometheusUsageMetrics implements UsageMetrics {

constructor(private prometheusClient: PrometheusClient) {}

async getCountryUsage(): Promise<CountryUsage[]> {
private async queryUsage(
timeSeriesSelector: string,
deltaSecs: number
): Promise<PrometheusQueryResult> {
const query = `
sum(increase(${timeSeriesSelector}[${deltaSecs}s]))
by (${PROMETHEUS_COUNTRY_LABEL}, ${PROMETHEUS_ASN_LABEL})
`;
const queryResponse = await this.prometheusClient.query(query);
const result: PrometheusQueryResult = {};
for (const entry of queryResponse.result) {
const serializedKey = JSON.stringify(entry.metric, Object.keys(entry.metric).sort());
result[serializedKey] = {
metric: entry.metric,
value: Math.round(parseFloat(entry.value[1])),
};
}
return result;
}

async getLocationUsage(): Promise<LocationUsage[]> {
const timeDeltaSecs = Math.round((Date.now() - this.resetTimeMs) / 1000);
// We measure the traffic to and from the target, since that's what we are protecting.
const result = await this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes_per_location{dir=~"p>t|p<t"}[${timeDeltaSecs}s])) by (location)`
);
const usage = [] as CountryUsage[];
for (const entry of result.result) {
const country = entry.metric['location'] || '';
const inboundBytes = Math.round(parseFloat(entry.value[1]));
usage.push({country, inboundBytes});
const [dataBytesResult, tunnelTimeResult] = await Promise.all([
// We measure the traffic to and from the target, since that's what we are protecting.
this.queryUsage('shadowsocks_data_bytes_per_location{dir=~"p>t|p<t"}', timeDeltaSecs),
this.queryUsage('shadowsocks_tunnel_time_seconds_per_location', timeDeltaSecs),
]);

// We join the bytes and tunneltime metrics together by location (i.e. country and ASN).
const mergedResult: {
[metricKey: string]: {
metric: QueryResultMetric;
inboundBytes?: number;
tunnelTimeSec?: number;
};
} = {};
for (const [key, entry] of Object.entries(dataBytesResult)) {
mergedResult[key] = {...mergedResult[key], metric: entry.metric, inboundBytes: entry.value};
}
for (const [key, entry] of Object.entries(tunnelTimeResult)) {
mergedResult[key] = {...mergedResult[key], metric: entry.metric, tunnelTimeSec: entry.value};
}

const usage: LocationUsage[] = [];
for (const entry of Object.values(mergedResult)) {
const country = entry.metric[PROMETHEUS_COUNTRY_LABEL] || '';
const asn = entry.metric[PROMETHEUS_ASN_LABEL]
? Number(entry.metric[PROMETHEUS_ASN_LABEL])
: undefined;
usage.push({
country,
asn,
inboundBytes: entry.inboundBytes || 0,
tunnelTimeSec: entry.tunnelTimeSec || 0,
});
}
return usage;
}
Expand All @@ -105,7 +163,7 @@ export interface MetricsCollectorClient {
collectFeatureMetrics(reportJson: DailyFeatureMetricsReportJson): Promise<void>;
}

export class RestMetricsCollectorClient {
export class RestMetricsCollectorClient implements MetricsCollectorClient {
constructor(private serviceUrl: string) {}

collectServerUsageMetrics(reportJson: HourlyServerMetricsReportJson): Promise<void> {
Expand Down Expand Up @@ -163,7 +221,7 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
return;
}
try {
await this.reportServerUsageMetrics(await usageMetrics.getCountryUsage());
await this.reportServerUsageMetrics(await usageMetrics.getLocationUsage());
usageMetrics.reset();
} catch (err) {
logging.error(`Failed to report server usage metrics: ${err}`);
Expand Down Expand Up @@ -197,24 +255,29 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
return this.serverConfig.data().metricsEnabled || false;
}

private async reportServerUsageMetrics(countryUsageMetrics: CountryUsage[]): Promise<void> {
private async reportServerUsageMetrics(locationUsageMetrics: LocationUsage[]): Promise<void> {
const reportEndTimestampMs = this.clock.now();

const userReports = [] as HourlyUserMetricsReportJson[];
for (const countryUsage of countryUsageMetrics) {
if (countryUsage.inboundBytes === 0) {
const userReports: HourlyUserMetricsReportJson[] = [];
for (const locationUsage of locationUsageMetrics) {
if (locationUsage.inboundBytes === 0 && locationUsage.tunnelTimeSec === 0) {
continue;
}
if (isSanctionedCountry(countryUsage.country)) {
if (isSanctionedCountry(locationUsage.country)) {
continue;
}
// Make sure to always set a country, which is required by the metrics server validation.
// It's used to differentiate the row from the legacy key usage rows.
const country = countryUsage.country || 'ZZ';
userReports.push({
bytesTransferred: countryUsage.inboundBytes,
const country = locationUsage.country || 'ZZ';
const report: HourlyUserMetricsReportJson = {
countries: [country],
});
bytesTransferred: locationUsage.inboundBytes,
tunnelTimeSec: locationUsage.tunnelTimeSec,
};
if (locationUsage.asn) {
report.asn = locationUsage.asn;
}
userReports.push(report);
}
const report = {
serverId: this.serverConfig.data().serverId,
Expand Down
Loading