From a61d16b46b5db519db87db1dd0e93b52bb65f908 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Wed, 4 Sep 2024 15:59:30 +0200 Subject: [PATCH] Implement check-in scheduler Implement a check-in event scheduler, which schedules check-in events to be transmitted in a separate thread, with a minimum wait period of ten seconds between requests, and a wait period of a tenth of a second before the first request, referred to as "debounce" periods. This is a relatively minor improvement for the existing cron check-ins, but it is a requirement for the heartbeat check-ins, both to avoid slowing down customers' applications with blocking requests, and to avoid misuse of the feature from spamming our servers. The scheduler also acts as a deduplicator, removing "similar enough" check-in events -- again, not particularly interesting for cron check-ins, but a requirement to minimise damage when heartbeat check-ins are misused. Implement utility functions for serialising and deserialising NDJSON payloads. When an event is scheduled, the scheduler sets a timeout for the duration of the timeout period, after which the events stored in the scheduler are transmitted. When `Appsignal.stop` is called and awaited, the scheduler is gracefully stopped, awaiting the transmission of all scheduled check-in events. During this time, new check-in events cannot be scheduled. --- .changesets/check-in-scheduler.md | 6 + src/__tests__/check_in.test.ts | 482 +++++++++++++++++++++--------- src/__tests__/probes.test.ts | 4 - src/__tests__/utils.test.ts | 25 +- src/check_in.ts | 117 -------- src/check_in/cron.ts | 40 +++ src/check_in/index.ts | 23 ++ src/check_in/scheduler.ts | 201 +++++++++++++ src/client.ts | 5 +- src/utils.ts | 8 + 10 files changed, 649 insertions(+), 262 deletions(-) create mode 100644 .changesets/check-in-scheduler.md delete mode 100644 src/check_in.ts create mode 100644 src/check_in/cron.ts create mode 100644 src/check_in/index.ts create mode 100644 src/check_in/scheduler.ts diff --git a/.changesets/check-in-scheduler.md b/.changesets/check-in-scheduler.md new file mode 100644 index 00000000..4a1d7edc --- /dev/null +++ b/.changesets/check-in-scheduler.md @@ -0,0 +1,6 @@ +--- +bump: patch +type: change +--- + +Send check-ins concurrently. When calling `Appsignal.checkIn.cron`, instead of blocking the current process while the check-in events are sent, schedule them to be sent in a separate process. diff --git a/src/__tests__/check_in.test.ts b/src/__tests__/check_in.test.ts index f38df11e..5ab80ae5 100644 --- a/src/__tests__/check_in.test.ts +++ b/src/__tests__/check_in.test.ts @@ -1,5 +1,12 @@ -import nock, { Scope } from "nock" -import { cron, Cron, EventKind } from "../check_in" +import nock from "nock" +import { cron, Cron, Event, EventKind } from "../check_in" +import { + scheduler, + resetScheduler, + setDebounceTime, + resetDebounceTime, + debounceTime +} from "../check_in/scheduler" import { Client, Options } from "../client" import { heartbeat, @@ -7,6 +14,8 @@ import { heartbeatClassWarnOnce, heartbeatHelperWarnOnce } from "../heartbeat" +import { ndjsonParse } from "../utils" +import type { InternalLogger } from "../internal_logger" const DEFAULT_CLIENT_CONFIG: Partial = { active: true, @@ -16,33 +25,38 @@ const DEFAULT_CLIENT_CONFIG: Partial = { hostname: "test-hostname" } -function mockCronCheckInRequest( - kind: EventKind, - { delay } = { delay: 0 } -): Scope { - return nock("https://appsignal-endpoint.net:443") - .post("/check_ins/json", body => { - return ( - body.identifier === "test-cron-checkin" && - body.kind === kind && - body.check_in_type === "cron" - ) - }) - .query({ - api_key: "test-push-api-key", - name: "Test App", - environment: "test", - hostname: "test-hostname" - }) - .delay(delay) - .reply(200, "") +type Request = Event[] + +function mockCheckInRequests(): Request[] { + const requests: Request[] = [] + + const appendCheckInRequests = async () => { + requests.push(await mockOneCheckInRequest()) + appendCheckInRequests() + } + + appendCheckInRequests() + + return requests } -function nextTick(fn: () => void): Promise { +function mockOneCheckInRequest( + customReply?: (interceptor: nock.Interceptor) => nock.Scope +): Promise { + const reply = customReply || (scope => scope.reply(200, "")) + return new Promise(resolve => { - process.nextTick(() => { - fn() - resolve() + const interceptor = nock("https://appsignal-endpoint.net:443") + .post("/check_ins/json") + .query({ + api_key: "test-push-api-key", + name: "Test App", + environment: "test", + hostname: "test-hostname" + }) + const scope = reply(interceptor) + scope.on("request", (_req, _interceptor, body: string) => { + resolve(ndjsonParse(body) as Event[]) }) }) } @@ -53,17 +67,50 @@ function sleep(ms: number): Promise { }) } -function interceptRequestBody(scope: Scope): Promise { - return new Promise(resolve => { - scope.on("request", (_req, _interceptor, body: string) => { - resolve(body) - }) - }) +function expectEvents(actual: Event[], expected: Partial[]): void { + expect(actual).toHaveLength(expected.length) + + for (let i = 0; i < expected.length; i++) { + const event = actual[i] + const expectedEvent = expected[i] + + for (const key in expectedEvent) { + expect(event[key as keyof Event]).toEqual( + expectedEvent[key as keyof Event] + ) + } + } +} + +function expectCronEvents(actual: Event[], expected: EventKind[]): void { + expectEvents( + actual, + expected.map(kind => ({ + identifier: "test-cron-checkin", + kind, + check_in_type: "cron" + })) + ) +} + +function spyOnInternalLogger( + client: Client +): Record { + const spies: Partial> = {} + + for (const level of ["error", "warn", "info", "debug", "trace"] as const) { + spies[level] = jest + .spyOn(client.internalLogger, level as keyof InternalLogger) + .mockImplementation() + } + + return spies as Required } describe("checkIn.Cron", () => { let client: Client let theCron: Cron + let requests: Request[] beforeAll(() => { theCron = new Cron("test-cron-checkin") @@ -73,18 +120,24 @@ describe("checkIn.Cron", () => { } }) - beforeEach(() => { + beforeEach(async () => { + await resetScheduler() + resetDebounceTime() + client = new Client(DEFAULT_CLIENT_CONFIG) - nock.cleanAll() nock.disableNetConnect() + requests = mockCheckInRequests() }) - afterEach(() => { - client.stop() + afterEach(async () => { + await client.stop() + nock.cleanAll() }) - afterAll(() => { + afterAll(async () => { + await resetScheduler() + resetDebounceTime() nock.restore() }) @@ -95,92 +148,267 @@ describe("checkIn.Cron", () => { active: false }) - const startScope = mockCronCheckInRequest("start") - const finishScope = mockCronCheckInRequest("finish") + const internalLoggerSpies = spyOnInternalLogger(client) + + theCron.start() + theCron.finish() - await expect(theCron.start()).resolves.toBeUndefined() - await expect(theCron.finish()).resolves.toBeUndefined() + expect(internalLoggerSpies.debug).toHaveBeenCalledTimes(2) + internalLoggerSpies.debug.mock.calls.forEach(call => { + expect(call[0]).toMatch(/^Cannot schedule cron check-in/) + expect(call[0]).toMatch(/: AppSignal is not active$/) + }) - expect(startScope.isDone()).toBe(false) - expect(finishScope.isDone()).toBe(false) + await scheduler.shutdown() + + expect(requests).toHaveLength(0) + }) + + it("does not transmit any events when AppSignal is shutting down", async () => { + await scheduler.shutdown() + + const internalLoggerSpies = spyOnInternalLogger(client) + + theCron.start() + theCron.finish() + + expect(internalLoggerSpies.debug).toHaveBeenCalledTimes(2) + internalLoggerSpies.debug.mock.calls.forEach(call => { + expect(call[0]).toMatch(/^Cannot schedule cron check-in/) + expect(call[0]).toMatch(/: AppSignal is stopped$/) + }) + + expect(requests).toHaveLength(0) }) it("cron.start() sends a cron check-in start event", async () => { - const scope = mockCronCheckInRequest("start") + const internalLoggerSpies = spyOnInternalLogger(client) - await expect(theCron.start()).resolves.toBeUndefined() + theCron.start() - scope.done() + expect(internalLoggerSpies.trace).toHaveBeenCalledTimes(1) + expect(internalLoggerSpies.trace.mock.calls[0][0]).toMatch( + /^Scheduling cron check-in `test-cron-checkin` start event/ + ) + + await scheduler.shutdown() + + expect(internalLoggerSpies.trace).toHaveBeenCalledTimes(2) + expect(internalLoggerSpies.trace.mock.calls[1][0]).toMatch( + /^Transmitted cron check-in `test-cron-checkin` start event/ + ) + + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["start"]) }) it("cron.finish() sends a cron check-in finish event", async () => { - const scope = mockCronCheckInRequest("finish") + const internalLoggerSpies = spyOnInternalLogger(client) + + theCron.finish() - await expect(theCron.finish()).resolves.toBeUndefined() + expect(internalLoggerSpies.trace).toHaveBeenCalledTimes(1) + expect(internalLoggerSpies.trace.mock.calls[0][0]).toMatch( + /^Scheduling cron check-in `test-cron-checkin` finish event/ + ) - scope.done() + await scheduler.shutdown() + + expect(internalLoggerSpies.trace).toHaveBeenCalledTimes(2) + expect(internalLoggerSpies.trace.mock.calls[1][0]).toMatch( + /^Transmitted cron check-in `test-cron-checkin` finish event/ + ) + + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["finish"]) }) - it("Cron.shutdown() awaits pending cron check-in event promises", async () => { - const startScope = mockCronCheckInRequest("start", { delay: 100 }) - const finishScope = mockCronCheckInRequest("finish", { delay: 200 }) + describe("Scheduler", () => { + beforeEach(() => { + // Remove the persistent mock for all requests. + // These tests will mock requests one by one, so that + // they can await their responses. + nock.cleanAll() + setDebounceTime(() => 20) + }) + + it("transmits events close to each other in time in a single request", async () => { + const request = mockOneCheckInRequest() + + theCron.start() + theCron.finish() + + const internalLoggerSpies = spyOnInternalLogger(client) + + const events = await request + expectCronEvents(events, ["start", "finish"]) + + await scheduler.allSettled() + + expect(internalLoggerSpies.trace).toHaveBeenCalledTimes(1) + expect(internalLoggerSpies.trace).toHaveBeenLastCalledWith( + "Transmitted 2 check-in events" + ) + }) + + it("transmits events far apart in time in separate requests", async () => { + let request = mockOneCheckInRequest() + + theCron.start() + + let events = await request + expectCronEvents(events, ["start"]) + + request = mockOneCheckInRequest() + + theCron.finish() + + events = await request + expectCronEvents(events, ["finish"]) + }) + + it("does not transmit redundant events in the same request", async () => { + const request = mockOneCheckInRequest() + + theCron.start() + + const internalLoggerSpies = spyOnInternalLogger(client) - let finishPromiseResolved = false - let shutdownPromiseResolved = false + theCron.start() + + expect(internalLoggerSpies.trace).toHaveBeenCalledTimes(1) + expect(internalLoggerSpies.trace.mock.calls[0][0]).toMatch( + /^Scheduling cron check-in `test-cron-checkin` start event/ + ) + + expect(internalLoggerSpies.debug).toHaveBeenCalledTimes(1) + expect(internalLoggerSpies.debug.mock.calls[0][0]).toMatch( + /^Replacing previously scheduled cron check-in `test-cron-checkin` start event/ + ) - const startPromise = theCron.start() + const events = await request + expectCronEvents(events, ["start"]) + await scheduler.allSettled() - theCron.finish().then(() => { - finishPromiseResolved = true + expect(internalLoggerSpies.trace).toHaveBeenCalledTimes(2) + expect(internalLoggerSpies.trace.mock.calls[1][0]).toMatch( + /^Transmitted cron check-in `test-cron-checkin` start event/ + ) }) - const shutdownPromise = Cron.shutdown().then(() => { - shutdownPromiseResolved = true + it("logs an error when the request returns a non-2xx status code", async () => { + const request = mockOneCheckInRequest(interceptor => + interceptor.reply(500, "") + ) + + theCron.start() + + const internalLoggerSpies = spyOnInternalLogger(client) + + await request + await scheduler.allSettled() + + expect(internalLoggerSpies.error).toHaveBeenCalledTimes(1) + expect(internalLoggerSpies.error.mock.calls[0][0]).toMatch( + /^Failed to transmit cron check-in `test-cron-checkin` start event/ + ) + expect(internalLoggerSpies.error.mock.calls[0][0]).toMatch( + /: status code was 500$/ + ) }) - await expect(startPromise).resolves.toBeUndefined() + it("logs an error when the request fails", async () => { + const request = mockOneCheckInRequest(interceptor => + interceptor.replyWithError("something went wrong") + ) + + theCron.start() + + const internalLoggerSpies = spyOnInternalLogger(client) + + await request + await scheduler.allSettled() - // The finish promise should still be pending, so the shutdown promise - // should not be resolved yet. - await nextTick(() => { - expect(finishPromiseResolved).toBe(false) - expect(shutdownPromiseResolved).toBe(false) + expect(internalLoggerSpies.error).toHaveBeenCalledTimes(1) + expect(internalLoggerSpies.error.mock.calls[0][0]).toMatch( + /^Failed to transmit cron check-in `test-cron-checkin` start event/ + ) + expect(internalLoggerSpies.error.mock.calls[0][0]).toMatch( + /: something went wrong$/ + ) }) - startScope.done() + it("transmits scheduled events when the scheduler is shut down", async () => { + // Set a very long debounce time to ensure that the scheduler + // is not awaiting it, but rather sending the events immediately + // on shutdown. + setDebounceTime(() => 10000) + + theCron.start() + + const request = mockOneCheckInRequest() - // The shutdown promise should not resolve until the finish promise - // resolves. - await expect(shutdownPromise).resolves.toBeUndefined() + await scheduler.shutdown() - await nextTick(() => { - expect(finishPromiseResolved).toBe(true) + const events = await request + expectCronEvents(events, ["start"]) }) - finishScope.done() + it("uses the last transmission time to calculate the debounce", async () => { + const request = mockOneCheckInRequest() + + const debounceTime = jest.fn(_lastTransmission => 0) + setDebounceTime(debounceTime) + + theCron.start() + + expect(debounceTime).toHaveBeenCalledTimes(1) + expect(debounceTime).toHaveBeenLastCalledWith(undefined) + + await request + const expectedLastTransmission = Date.now() + + theCron.finish() + + expect(debounceTime).toHaveBeenCalledTimes(2) + expect(debounceTime).not.toHaveBeenLastCalledWith(undefined) + // Allow for some margin of error in the timing of the tests. + expect(debounceTime.mock.calls[1][0]).toBeGreaterThan( + expectedLastTransmission - 20 + ) + }) + + describe("debounce time", () => { + beforeEach(resetDebounceTime) + + it("is short when no last transmission time is given", () => { + expect(debounceTime(undefined)).toBe(100) + }) + + it("is short when the last transmission time was a long time ago", () => { + expect(debounceTime(0)).toBe(100) + }) + + it("is long when the last transmission time is very recent", () => { + // Allow for some margin of error in the timing of the tests. + expect(debounceTime(Date.now())).toBeLessThanOrEqual(10000) + expect(debounceTime(Date.now())).toBeGreaterThan(10000 - 20) + }) + }) }) describe("Appsignal.checkIn.cron()", () => { it("without a function, sends a cron check-in finish event", async () => { - const startScope = mockCronCheckInRequest("start") - const finishScope = mockCronCheckInRequest("finish") - expect(cron("test-cron-checkin")).toBeUndefined() - await nextTick(() => { - expect(startScope.isDone()).toBe(false) - finishScope.done() - }) + await scheduler.shutdown() + + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["finish"]) }) describe("with a function", () => { it("sends cron check-in start and finish events", async () => { - const startScope = mockCronCheckInRequest("start") - const startBody = interceptRequestBody(startScope) - - const finishScope = mockCronCheckInRequest("finish") - const finishBody = interceptRequestBody(finishScope) - expect( cron("test-cron-checkin", () => { const thisSecond = Math.floor(Date.now() / 1000) @@ -197,51 +425,39 @@ describe("checkIn.Cron", () => { }) ).toBe("output") - // Since the function is synchronous and deadlocks, the start and - // finish events' requests are actually initiated simultaneously - // afterwards, when the function finishes and the event loop ticks. - await nextTick(() => { - startScope.done() - finishScope.done() - }) + await scheduler.shutdown() - expect(JSON.parse(await finishBody).timestamp).toBeGreaterThan( - JSON.parse(await startBody).timestamp - ) + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["start", "finish"]) + expect(requests[0][0].timestamp).toBeLessThan(requests[0][1].timestamp) }) it("does not send a finish event when the function throws an error", async () => { - const startScope = mockCronCheckInRequest("start") - const finishScope = mockCronCheckInRequest("finish") - expect(() => { cron("test-cron-checkin", () => { throw new Error("thrown") }) }).toThrow("thrown") - await nextTick(() => { - startScope.done() - expect(finishScope.isDone()).toBe(false) - }) + await scheduler.shutdown() + + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["start"]) }) }) describe("with an async function", () => { it("sends cron check-in start and finish events", async () => { - const startScope = mockCronCheckInRequest("start") - const startBody = interceptRequestBody(startScope) - - const finishScope = mockCronCheckInRequest("finish") - const finishBody = interceptRequestBody(finishScope) + // Set a debounce time larger than the maximum sleep time. + // This is to ensure that the scheduler will not send the start and + // finish events together at shutdown, rather than in separate + // requests. + setDebounceTime(() => { + return 10000 + }) await expect( cron("test-cron-checkin", async () => { - await nextTick(() => { - startScope.done() - expect(finishScope.isDone()).toBe(false) - }) - const millisecondsToNextSecond = 1000 - (Date.now() % 1000) await sleep(millisecondsToNextSecond) @@ -249,35 +465,30 @@ describe("checkIn.Cron", () => { }) ).resolves.toBe("output") - await nextTick(() => { - startScope.done() - finishScope.done() - }) + await scheduler.shutdown() - expect(JSON.parse(await finishBody).timestamp).toBeGreaterThan( - JSON.parse(await startBody).timestamp - ) + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["start", "finish"]) + expect(requests[0][0].timestamp).toBeLessThan(requests[0][1].timestamp) }) it("does not send a finish event when the promise returned is rejected", async () => { - const startScope = mockCronCheckInRequest("start") - const finishScope = mockCronCheckInRequest("finish") + // Set a debounce time larger than the maximum sleep time. + // This is to ensure that the scheduler will not send the start and + // finish events together at shutdown, rather than in separate + // requests. + setDebounceTime(() => 10000) await expect( cron("test-cron-checkin", async () => { - await nextTick(() => { - startScope.done() - expect(finishScope.isDone()).toBe(false) - }) - throw new Error("rejected") }) ).rejects.toThrow("rejected") - await nextTick(() => { - startScope.done() - expect(finishScope.isDone()).toBe(false) - }) + await scheduler.shutdown() + + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["start"]) }) }) }) @@ -288,15 +499,12 @@ describe("checkIn.Cron", () => { }) it("behaves like Appsignal.checkIn.cron", async () => { - const startScope = mockCronCheckInRequest("start") - const finishScope = mockCronCheckInRequest("finish") - expect(heartbeat("test-cron-checkin")).toBeUndefined() - await nextTick(() => { - expect(startScope.isDone()).toBe(false) - finishScope.done() - }) + await scheduler.shutdown() + + expect(requests).toHaveLength(1) + expectCronEvents(requests[0], ["finish"]) }) it("emits a warning when called", async () => { diff --git a/src/__tests__/probes.test.ts b/src/__tests__/probes.test.ts index ceae232f..e49777f1 100644 --- a/src/__tests__/probes.test.ts +++ b/src/__tests__/probes.test.ts @@ -77,10 +77,6 @@ describe("Probes", () => { enableMinutelyProbes }) expect(client.metrics()).toBeInstanceOf(Metrics) - // Stop client so it doesn't actually export any metrics, which would - // cause the entire test to fail as the agent's not running - client.stop() - probes = client.metrics().probes() } diff --git a/src/__tests__/utils.test.ts b/src/__tests__/utils.test.ts index 8a981bb8..2574b5f5 100644 --- a/src/__tests__/utils.test.ts +++ b/src/__tests__/utils.test.ts @@ -1,4 +1,9 @@ -import { getAgentTimestamps, hrTime } from "../utils" +import { + getAgentTimestamps, + hrTime, + ndjsonStringify, + ndjsonParse +} from "../utils" import perf_hooks from "perf_hooks" describe("Utils", () => { @@ -27,4 +32,22 @@ describe("Utils", () => { }) }) }) + + describe("ndjsonStringify", () => { + it("stringifies elements into NDJSON", () => { + const elements = [{ foo: "bar" }, { baz: "qux" }] + const result = ndjsonStringify(elements) + + expect(result).toEqual('{"foo":"bar"}\n{"baz":"qux"}') + }) + }) + + describe("ndjsonParse", () => { + it("parses NDJSON into elements", () => { + const data = '{"foo":"bar"}\n{"baz":"qux"}' + const result = ndjsonParse(data) + + expect(result).toEqual([{ foo: "bar" }, { baz: "qux" }]) + }) + }) }) diff --git a/src/check_in.ts b/src/check_in.ts deleted file mode 100644 index 2ccae270..00000000 --- a/src/check_in.ts +++ /dev/null @@ -1,117 +0,0 @@ -import crypto from "crypto" -import { Transmitter } from "./transmitter" -import { Client } from "./client" - -export type EventKind = "start" | "finish" - -export type Event = { - identifier: string - digest: string - kind: EventKind - timestamp: number - check_in_type: "cron" -} - -class PendingPromiseSet extends Set> { - add(promise: Promise) { - super.add(promise) - promise.finally(() => this.delete(promise)) - return this - } - - async allSettled() { - await Promise.allSettled(this) - } -} - -export class Cron { - private static cronPromises = new PendingPromiseSet() - - identifier: string - digest: string - - constructor(identifier: string) { - this.identifier = identifier - this.digest = crypto.randomBytes(8).toString("hex") - } - - public static async shutdown() { - await Cron.cronPromises.allSettled() - } - - public start(): Promise { - return this.transmit(this.event("start")) - } - - public finish(): Promise { - return this.transmit(this.event("finish")) - } - - private event(kind: EventKind): Event { - return { - identifier: this.identifier, - digest: this.digest, - kind: kind, - timestamp: Math.floor(Date.now() / 1000), - check_in_type: "cron" - } - } - - private transmit(event: Event): Promise { - if (Client.client === undefined || !Client.client.isActive) { - Client.internalLogger.debug( - "AppSignal not active, not transmitting cron check-in event" - ) - return Promise.resolve() - } - - const promise = new Transmitter( - `${Client.config.data.loggingEndpoint}/check_ins/json`, - JSON.stringify(event) - ).transmit() - - const handledPromise = promise - .then(({ status }: { status: number }) => { - if (status >= 200 && status <= 299) { - Client.internalLogger.trace( - `Transmitted cron check-in \`${event.identifier}\` (${event.digest}) ${event.kind} event` - ) - } else { - Client.internalLogger.warn( - `Failed to transmit cron check-in ${event.kind} event: status code was ${status}` - ) - } - }) - .catch(({ error }: { error: Error }) => { - Client.internalLogger.warn( - `Failed to transmit cron check-in ${event.kind} event: ${error.message}` - ) - - return Promise.resolve() - }) - - Cron.cronPromises.add(handledPromise) - - return handledPromise - } -} - -export function cron(identifier: string): void -export function cron(identifier: string, fn: () => T): T -export function cron(identifier: string, fn?: () => T): T | undefined { - const cron = new Cron(identifier) - let output - - if (fn) { - cron.start() - output = fn() - } - - if (output instanceof Promise) { - output.then(() => cron.finish()).catch(() => {}) - } else { - cron.finish() - } - - return output -} diff --git a/src/check_in/cron.ts b/src/check_in/cron.ts new file mode 100644 index 00000000..f0499c17 --- /dev/null +++ b/src/check_in/cron.ts @@ -0,0 +1,40 @@ +import { scheduler } from "./scheduler" +import crypto from "crypto" + +export type EventKind = "start" | "finish" + +export type Event = { + identifier: string + digest: string + kind: EventKind + timestamp: number + check_in_type: "cron" +} + +export class Cron { + identifier: string + digest: string + + constructor(identifier: string) { + this.identifier = identifier + this.digest = crypto.randomBytes(8).toString("hex") + } + + public start(): void { + scheduler.schedule(this.event("start")) + } + + public finish(): void { + scheduler.schedule(this.event("finish")) + } + + private event(kind: EventKind): Event { + return { + identifier: this.identifier, + digest: this.digest, + kind: kind, + timestamp: Math.floor(Date.now() / 1000), + check_in_type: "cron" + } + } +} diff --git a/src/check_in/index.ts b/src/check_in/index.ts new file mode 100644 index 00000000..23730e77 --- /dev/null +++ b/src/check_in/index.ts @@ -0,0 +1,23 @@ +import { Cron } from "./cron" +export { Cron } +export type { EventKind, Event } from "./cron" + +export function cron(identifier: string): void +export function cron(identifier: string, fn: () => T): T +export function cron(identifier: string, fn?: () => T): T | undefined { + const cron = new Cron(identifier) + let output + + if (fn) { + cron.start() + output = fn() + } + + if (output instanceof Promise) { + output.then(() => cron.finish()).catch(() => {}) + } else { + cron.finish() + } + + return output +} diff --git a/src/check_in/scheduler.ts b/src/check_in/scheduler.ts new file mode 100644 index 00000000..34118ab8 --- /dev/null +++ b/src/check_in/scheduler.ts @@ -0,0 +1,201 @@ +import type { Event } from "./cron" +import { Transmitter } from "../transmitter" +import { Client } from "../client" +import { ndjsonStringify } from "../utils" + +const INITIAL_DEBOUNCE_MILLISECONDS = 100 +const BETWEEN_TRANSMISSIONS_DEBOUNCE_MILLISECONDS = 10_000 + +const originalDebounceTime = (lastTransmission: number | undefined): number => { + if (lastTransmission === undefined) { + return INITIAL_DEBOUNCE_MILLISECONDS + } + + const elapsed = Date.now() - lastTransmission + + return Math.max( + INITIAL_DEBOUNCE_MILLISECONDS, + BETWEEN_TRANSMISSIONS_DEBOUNCE_MILLISECONDS - elapsed + ) +} + +export let debounceTime: typeof originalDebounceTime = originalDebounceTime + +export function setDebounceTime(fn: typeof originalDebounceTime) { + debounceTime = fn +} + +export function resetDebounceTime() { + debounceTime = originalDebounceTime +} + +class PendingPromiseSet extends Set> { + add(promise: Promise) { + super.add(promise) + promise.finally(() => this.delete(promise)) + return this + } + + async allSettled() { + await Promise.allSettled(this) + } +} + +export class Scheduler { + pendingTransmissions: PendingPromiseSet = new PendingPromiseSet() + events: Event[] = [] + waker?: NodeJS.Timeout + lastTransmission?: number + isShuttingDown = false + + async shutdown() { + this.isShuttingDown = true + this.runWaker() + await this.allSettled() + } + + allSettled(): Promise { + return this.pendingTransmissions.allSettled() + } + + schedule(event: Event) { + if (Client.client === undefined || !Client.client.isActive) { + Client.internalLogger.debug( + `Cannot schedule ${this.describe([event])}: AppSignal is not active` + ) + return + } + + if (this.isShuttingDown) { + Client.internalLogger.debug( + `Cannot schedule ${this.describe([event])}: AppSignal is stopped` + ) + return + } + + Client.internalLogger.trace(`Scheduling ${this.describe([event])}`) + + this.addEvent(event) + this.scheduleWaker() + } + + private addEvent(event: Event) { + // Remove redundant events, keeping the newly added one, which + // should be the one with the most recent timestamp. + this.events = this.events.filter(existingEvent => { + return !this.isRedundantEvent(existingEvent, event) + }) + this.events.push(event) + } + + private isRedundantEvent(existingEvent: Event, newEvent: Event): boolean { + let isRedundant = false + + if (newEvent.check_in_type === "cron") { + // Consider any existing cron check-in event redundant if it has the + // same identifier, digest and kind as the one we're adding. + isRedundant = + existingEvent.identifier === newEvent.identifier && + existingEvent.kind === newEvent.kind && + existingEvent.digest === newEvent.digest && + existingEvent.check_in_type === "cron" + } + + if (isRedundant) { + Client.internalLogger.debug( + `Replacing previously scheduled ${this.describe([existingEvent])}` + ) + } + + return isRedundant + } + + private scheduleWaker() { + if (this.waker === undefined) { + this.waker = setTimeout( + () => this.runWaker(), + debounceTime(this.lastTransmission) + ) + // Ensure the debounce period is not awaited when the Node.js process is + // shutting down -- users must call Appsignal.stop() to gracefully await the + // transmission of scheduled events. + this.waker.unref() + } + } + + private runWaker() { + this.lastTransmission = Date.now() + if (this.waker !== undefined) { + clearTimeout(this.waker) + this.waker = undefined + } + + const events = this.events + this.events = [] + if (events.length !== 0) { + // The events array may be empty when this function is called on shutdown. + this.transmit(events) + } + } + + private transmit(events: Event[]) { + const description = this.describe(events) + + const promise = new Transmitter( + `${Client.config.data.loggingEndpoint}/check_ins/json`, + ndjsonStringify(events) + ).transmit() + + const handledPromise = promise + .then(({ status }: { status: number }) => { + if (status >= 200 && status <= 299) { + Client.internalLogger.trace(`Transmitted ${description}`) + } else { + Client.internalLogger.error( + `Failed to transmit ${description}: status code was ${status}` + ) + } + }) + .catch(({ error }: { error: Error }) => { + Client.internalLogger.error( + `Failed to transmit ${description}: ${error.message}` + ) + + return Promise.resolve() + }) + + this.pendingTransmissions.add(handledPromise) + } + + private describe(events: Event[]): string { + const count = events.length + if (count === 0) { + // This shouldn't happen. + return "no check-in events" + } else if (count === 1) { + const event = events[0] + if (event.check_in_type === "cron") { + return ( + "cron check-in `" + + (event.identifier || "unknown") + + "` " + + (event.kind || "unknown") + + " event (digest " + + (event.digest || "unknown") + + ")" + ) + } else { + return "unknown check-in event" + } + } else { + return `${count} check-in events` + } + } +} + +export let scheduler = new Scheduler() + +export async function resetScheduler() { + await scheduler.shutdown() + scheduler = new Scheduler() +} diff --git a/src/client.ts b/src/client.ts index 34c155a9..f2bf3568 100644 --- a/src/client.ts +++ b/src/client.ts @@ -46,7 +46,7 @@ import { } from "@opentelemetry/instrumentation-restify" import { UndiciInstrumentation } from "@opentelemetry/instrumentation-undici" import { SpanProcessor, TestModeSpanProcessor } from "./span_processor" -import { Cron } from "./check_in" +import { scheduler } from "./check_in/scheduler" const DefaultInstrumentations = { "@appsignal/opentelemetry-instrumentation-bullmq": BullMQInstrumentation, @@ -217,10 +217,9 @@ export class Client { console.log("Stopping AppSignal") } - await this.#sdk?.shutdown() this.metrics().probes().stop() + await Promise.all([this.#sdk?.shutdown(), scheduler.shutdown()]) this.extension.stop() - await Cron.shutdown() } /** diff --git a/src/utils.ts b/src/utils.ts index 6d5e89b3..34f4aefc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -59,3 +59,11 @@ function numberToHrtime(epochMillis: number) { export function processGetuid() { return (process.getuid ?? (() => -1))() } + +export function ndjsonStringify(elements: any[]): string { + return elements.map(element => JSON.stringify(element)).join("\n") +} + +export function ndjsonParse(data: string): any[] { + return data.split("\n").map(line => JSON.parse(line)) +}