Skip to content

Commit

Permalink
feat: Instrument Deno.serve (#26964)
Browse files Browse the repository at this point in the history
Add basic trace to Deno.serve. Also updates a bit of the testing infra
to make it easier to deal with.
  • Loading branch information
devsnek authored and bartlomieju committed Nov 28, 2024
1 parent 3f0e7c6 commit f64aa29
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 145 deletions.
193 changes: 122 additions & 71 deletions ext/http/00_serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ const {
ObjectHasOwn,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
SafePromisePrototypeFinally,
PromisePrototypeThen,
String,
StringPrototypeIncludes,
StringPrototypeSlice,
Symbol,
TypeError,
TypedArrayPrototypeGetSymbolToStringTag,
Expand Down Expand Up @@ -513,91 +516,139 @@ function fastSyncResponseOrStream(
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(context, callback, onError) {
return async function (req) {
const asyncContext = getAsyncContext();
setAsyncContext(context.asyncContext);

let mapped = async function (req, span) {
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
// 500 error.
let innerRequest;
let response;
try {
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
// 500 error.
let innerRequest;
let response;
try {
innerRequest = new InnerRequest(req, context);
const request = fromInnerRequest(innerRequest, "immutable");
innerRequest.request = request;
response = await callback(
request,
new ServeHandlerInfo(innerRequest),
innerRequest = new InnerRequest(req, context);
const request = fromInnerRequest(innerRequest, "immutable");
innerRequest.request = request;

if (span) {
span.updateName(request.method);
span.setAttribute("http.request.method", request.method);
const url = new URL(request.url);
span.setAttribute("url.full", request.url);
span.setAttribute(
"url.scheme",
StringPrototypeSlice(url.protocol, 0, -1),
);
span.setAttribute("url.path", url.pathname);
span.setAttribute("url.query", StringPrototypeSlice(url.search, 1));
}

// Throwing Error if the handler return value is not a Response class
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
throw new TypeError(
"Return value from serve handler must be a response or a promise resolving to a response",
);
}
response = await callback(
request,
new ServeHandlerInfo(innerRequest),
);

if (response.type === "error") {
throw new TypeError(
"Return value from serve handler must not be an error response (like Response.error())",
);
}
// Throwing Error if the handler return value is not a Response class
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
throw new TypeError(
"Return value from serve handler must be a response or a promise resolving to a response",
);
}

if (response.type === "error") {
throw new TypeError(
"Return value from serve handler must not be an error response (like Response.error())",
);
}

if (response.bodyUsed) {
if (response.bodyUsed) {
throw new TypeError(
"The body of the Response returned from the serve handler has already been consumed",
);
}
} catch (error) {
try {
response = await onError(error);
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
throw new TypeError(
"The body of the Response returned from the serve handler has already been consumed",
"Return value from onError handler must be a response or a promise resolving to a response",
);
}
} catch (error) {
try {
response = await onError(error);
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
throw new TypeError(
"Return value from onError handler must be a response or a promise resolving to a response",
);
}
} catch (error) {
// deno-lint-ignore no-console
console.error("Exception in onError while handling exception", error);
response = internalServerError();
}
}
const inner = toInnerResponse(response);
if (innerRequest?.[_upgraded]) {
// We're done here as the connection has been upgraded during the callback and no longer requires servicing.
if (response !== UPGRADE_RESPONSE_SENTINEL) {
// deno-lint-ignore no-console
console.error("Upgrade response was not returned from callback");
context.close();
}
innerRequest?.[_upgraded]();
return;
// deno-lint-ignore no-console
console.error("Exception in onError while handling exception", error);
response = internalServerError();
}
}

// Did everything shut down while we were waiting?
if (context.closed) {
// We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
innerRequest?.close();
op_http_set_promise_complete(req, 503);
return;
}
if (span) {
span.setAttribute(
"http.response.status_code",
String(response.status),
);
}

const status = inner.status;
const headers = inner.headerList;
if (headers && headers.length > 0) {
if (headers.length == 1) {
op_http_set_response_header(req, headers[0][0], headers[0][1]);
} else {
op_http_set_response_headers(req, headers);
}
const inner = toInnerResponse(response);
if (innerRequest?.[_upgraded]) {
// We're done here as the connection has been upgraded during the callback and no longer requires servicing.
if (response !== UPGRADE_RESPONSE_SENTINEL) {
// deno-lint-ignore no-console
console.error("Upgrade response was not returned from callback");
context.close();
}
innerRequest?.[_upgraded]();
return;
}

fastSyncResponseOrStream(req, inner.body, status, innerRequest);
} finally {
setAsyncContext(asyncContext);
// Did everything shut down while we were waiting?
if (context.closed) {
// We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
innerRequest?.close();
op_http_set_promise_complete(req, 503);
return;
}

const status = inner.status;
const headers = inner.headerList;
if (headers && headers.length > 0) {
if (headers.length == 1) {
op_http_set_response_header(req, headers[0][0], headers[0][1]);
} else {
op_http_set_response_headers(req, headers);
}
}

fastSyncResponseOrStream(req, inner.body, status, innerRequest);
};

if (internals.telemetry.tracingEnabled) {
const { Span, enterSpan, endSpan } = internals.telemetry;
const origMapped = mapped;
mapped = function (req, _span) {
const oldCtx = getAsyncContext();
setAsyncContext(context.asyncContext);
const span = new Span("deno.serve");
try {
enterSpan(span);
return SafePromisePrototypeFinally(
origMapped(req, span),
() => endSpan(span),
);
} finally {
// equiv to exitSpan.
setAsyncContext(oldCtx);
}
};
} else {
const origMapped = mapped;
mapped = function (req, span) {
const oldCtx = getAsyncContext();
setAsyncContext(context.asyncContext);
try {
return origMapped(req, span);
} finally {
setAsyncContext(oldCtx);
}
};
}

return mapped;
}

type RawHandler = (
Expand Down Expand Up @@ -795,7 +846,7 @@ function serveHttpOn(context, addr, callback) {
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
while ((req = op_http_try_wait(rid)) !== null) {
PromisePrototypeCatch(callback(req), promiseErrorHandler);
PromisePrototypeCatch(callback(req, undefined), promiseErrorHandler);
}
currentPromise = op_http_wait(rid);
if (!ref) {
Expand All @@ -815,7 +866,7 @@ function serveHttpOn(context, addr, callback) {
if (req === null) {
break;
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
PromisePrototypeCatch(callback(req, undefined), promiseErrorHandler);
}

try {
Expand Down
66 changes: 29 additions & 37 deletions runtime/js/telemetry.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { core, primordials } from "ext:core/mod.js";
import { core, internals, primordials } from "ext:core/mod.js";
import {
op_crypto_get_random_values,
op_otel_instrumentation_scope_create_and_enter,
Expand Down Expand Up @@ -32,11 +32,9 @@ const {
ObjectDefineProperty,
WeakRefPrototypeDeref,
String,
StringPrototypePadStart,
ObjectPrototypeIsPrototypeOf,
DataView,
DataViewPrototypeSetUint32,
SafeWeakRef,
TypedArrayPrototypeGetBuffer,
} = primordials;
const { AsyncVariable, setAsyncContext } = core;

Expand Down Expand Up @@ -404,7 +402,7 @@ export class Span {
span.#asyncContext = NO_ASYNC_CONTEXT;
};

exitSpan = (span: Span) => {
endSpan = (span: Span) => {
const endTime = now();
submit(
span.#spanId,
Expand Down Expand Up @@ -449,39 +447,11 @@ export class Span {
const currentSpan: Span | {
spanContext(): { traceId: string; spanId: string };
} = CURRENT.get()?.getValue(SPAN_KEY);
if (!currentSpan) {
const buffer = new Uint8Array(TRACE_ID_BYTES + SPAN_ID_BYTES);
if (DETERMINISTIC) {
DataViewPrototypeSetUint32(
new DataView(TypedArrayPrototypeGetBuffer(buffer)),
TRACE_ID_BYTES - 4,
COUNTER,
true,
);
COUNTER += 1;
DataViewPrototypeSetUint32(
new DataView(TypedArrayPrototypeGetBuffer(buffer)),
TRACE_ID_BYTES + SPAN_ID_BYTES - 4,
COUNTER,
true,
);
COUNTER += 1;
} else {
op_crypto_get_random_values(buffer);
}
this.#traceId = TypedArrayPrototypeSubarray(buffer, 0, TRACE_ID_BYTES);
this.#spanId = TypedArrayPrototypeSubarray(buffer, TRACE_ID_BYTES);
} else {
this.#spanId = new Uint8Array(SPAN_ID_BYTES);
if (currentSpan) {
if (DETERMINISTIC) {
DataViewPrototypeSetUint32(
new DataView(TypedArrayPrototypeGetBuffer(this.#spanId)),
SPAN_ID_BYTES - 4,
COUNTER,
true,
);
COUNTER += 1;
this.#spanId = StringPrototypePadStart(String(COUNTER++), 16, "0");
} else {
this.#spanId = new Uint8Array(SPAN_ID_BYTES);
op_crypto_get_random_values(this.#spanId);
}
// deno-lint-ignore prefer-primordials
Expand All @@ -493,6 +463,16 @@ export class Span {
this.#traceId = context.traceId;
this.#parentSpanId = context.spanId;
}
} else {
if (DETERMINISTIC) {
this.#traceId = StringPrototypePadStart(String(COUNTER++), 32, "0");
this.#spanId = StringPrototypePadStart(String(COUNTER++), 16, "0");
} else {
const buffer = new Uint8Array(TRACE_ID_BYTES + SPAN_ID_BYTES);
op_crypto_get_random_values(buffer);
this.#traceId = TypedArrayPrototypeSubarray(buffer, 0, TRACE_ID_BYTES);
this.#spanId = TypedArrayPrototypeSubarray(buffer, TRACE_ID_BYTES);
}
}
}

Expand Down Expand Up @@ -717,4 +697,16 @@ export function bootstrap(
}
}

export const telemetry = { SpanExporter, ContextManager };
export const telemetry = {
SpanExporter,
ContextManager,
};
internals.telemetry = {
Span,
enterSpan,
exitSpan,
endSpan,
get tracingEnabled() {
return TRACING_ENABLED;
},
};
6 changes: 3 additions & 3 deletions runtime/ops/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,9 @@ fn op_otel_span_set_dropped(
#[smi] dropped_events_count: u32,
) {
if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() {
temporary_span.0.dropped_attributes_count = dropped_attributes_count;
temporary_span.0.links.dropped_count = dropped_links_count;
temporary_span.0.events.dropped_count = dropped_events_count;
temporary_span.0.dropped_attributes_count += dropped_attributes_count;
temporary_span.0.links.dropped_count += dropped_links_count;
temporary_span.0.events.dropped_count += dropped_events_count;
}
}

Expand Down
12 changes: 0 additions & 12 deletions tests/specs/cli/otel_basic/__test__.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,18 @@
"steps": [
{
"args": "run -A main.ts basic.ts",
"envs": {
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
},
"output": "basic.out"
},
{
"args": "run -A main.ts natural_exit.ts",
"envs": {
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
},
"output": "natural_exit.out"
},
{
"args": "run -A main.ts deno_dot_exit.ts",
"envs": {
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
},
"output": "deno_dot_exit.out"
},
{
"args": "run -A main.ts uncaught.ts",
"envs": {
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
},
"output": "uncaught.out"
}
]
Expand Down
Loading

0 comments on commit f64aa29

Please sign in to comment.