diff --git a/.changesets/add-heartbeat-check-ins.md b/.changesets/add-heartbeat-check-ins.md new file mode 100644 index 00000000..09b567da --- /dev/null +++ b/.changesets/add-heartbeat-check-ins.md @@ -0,0 +1,30 @@ +--- +bump: minor +type: add +--- + +Add support for heartbeat check-ins. + +Use the `checkIn.heartbeat` method to send a single heartbeat check-in event from your application. This can be used, for example, in your application's main loop: + +```js +import { checkIn } from "@appsignal/nodejs" + +while (true) { + checkIn.heartbeat("job_processor") + await processJob() +} +``` + +Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` method is called, at most one heartbeat with the same identifier will be sent every ten seconds. + +Pass `{continuous: true}` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, after your application has finished its boot process: + +```js +import { checkIn } from "@appsignal/nodejs" + +function main() { + checkIn.heartbeat("job_processor", {continuous: true}) + startApp() +} +``` diff --git a/src/__tests__/check_in.test.ts b/src/__tests__/check_in.test.ts index 5ab80ae5..27ec01b1 100644 --- a/src/__tests__/check_in.test.ts +++ b/src/__tests__/check_in.test.ts @@ -1,21 +1,20 @@ import nock from "nock" -import { cron, Cron, Event, EventKind } from "../check_in" -import { - scheduler, - resetScheduler, - setDebounceTime, - resetDebounceTime, - debounceTime -} from "../check_in/scheduler" +import { cron, Cron, heartbeat } from "../check_in" +import { Event, EventKind } from "../check_in/event" +import { scheduler, resetScheduler, debounceTime } from "../check_in/scheduler" import { Client, Options } from "../client" import { - heartbeat, + heartbeat as deprecatedHeartbeat, Heartbeat, heartbeatClassWarnOnce, heartbeatHelperWarnOnce } from "../heartbeat" import { ndjsonParse } from "../utils" import type { InternalLogger } from "../internal_logger" +import { + heartbeatInterval, + killContinuousHeartbeats +} from "../check_in/heartbeat" const DEFAULT_CLIENT_CONFIG: Partial = { active: true, @@ -107,7 +106,7 @@ function spyOnInternalLogger( return spies as Required } -describe("checkIn.Cron", () => { +describe("Check-ins", () => { let client: Client let theCron: Cron let requests: Request[] @@ -122,7 +121,7 @@ describe("checkIn.Cron", () => { beforeEach(async () => { await resetScheduler() - resetDebounceTime() + debounceTime.reset() client = new Client(DEFAULT_CLIENT_CONFIG) @@ -137,7 +136,7 @@ describe("checkIn.Cron", () => { afterAll(async () => { await resetScheduler() - resetDebounceTime() + debounceTime.reset() nock.restore() }) @@ -229,7 +228,7 @@ describe("checkIn.Cron", () => { // These tests will mock requests one by one, so that // they can await their responses. nock.cleanAll() - setDebounceTime(() => 20) + debounceTime.set(() => 20) }) it("transmits events close to each other in time in a single request", async () => { @@ -342,7 +341,7 @@ describe("checkIn.Cron", () => { // Set a very long debounce time to ensure that the scheduler // is not awaiting it, but rather sending the events immediately // on shutdown. - setDebounceTime(() => 10000) + debounceTime.set(() => 10000) theCron.start() @@ -357,29 +356,29 @@ describe("checkIn.Cron", () => { it("uses the last transmission time to calculate the debounce", async () => { const request = mockOneCheckInRequest() - const debounceTime = jest.fn(_lastTransmission => 0) - setDebounceTime(debounceTime) + const debounceTimeMock = jest.fn(_lastTransmission => 0) + debounceTime.set(debounceTimeMock) theCron.start() - expect(debounceTime).toHaveBeenCalledTimes(1) - expect(debounceTime).toHaveBeenLastCalledWith(undefined) + expect(debounceTimeMock).toHaveBeenCalledTimes(1) + expect(debounceTimeMock).toHaveBeenLastCalledWith(undefined) await request const expectedLastTransmission = Date.now() theCron.finish() - expect(debounceTime).toHaveBeenCalledTimes(2) - expect(debounceTime).not.toHaveBeenLastCalledWith(undefined) + expect(debounceTimeMock).toHaveBeenCalledTimes(2) + expect(debounceTimeMock).not.toHaveBeenLastCalledWith(undefined) // Allow for some margin of error in the timing of the tests. - expect(debounceTime.mock.calls[1][0]).toBeGreaterThan( + expect(debounceTimeMock.mock.calls[1][0]).toBeGreaterThan( expectedLastTransmission - 20 ) }) describe("debounce time", () => { - beforeEach(resetDebounceTime) + beforeEach(debounceTime.reset) it("is short when no last transmission time is given", () => { expect(debounceTime(undefined)).toBe(100) @@ -452,7 +451,7 @@ describe("checkIn.Cron", () => { // This is to ensure that the scheduler will not send the start and // finish events together at shutdown, rather than in separate // requests. - setDebounceTime(() => { + debounceTime.set(() => { return 10000 }) @@ -477,7 +476,7 @@ describe("checkIn.Cron", () => { // This is to ensure that the scheduler will not send the start and // finish events together at shutdown, rather than in separate // requests. - setDebounceTime(() => 10000) + debounceTime.set(() => 10000) await expect( cron("test-cron-checkin", async () => { @@ -493,13 +492,63 @@ describe("checkIn.Cron", () => { }) }) + describe("Appsignal.checkIn.heartbeat()", () => { + beforeAll(() => { + heartbeatInterval.set(() => 20) + }) + + afterEach(() => { + killContinuousHeartbeats() + }) + + afterAll(() => { + heartbeatInterval.reset() + }) + + it("sends a heartbeat event", async () => { + heartbeat("test-heartbeat") + + await scheduler.shutdown() + + expect(requests).toHaveLength(1) + expectEvents(requests[0], [ + { + identifier: "test-heartbeat", + check_in_type: "heartbeat" + } + ]) + }) + + describe("with the `continuous: true` option", () => { + it("sends heartbeat events continuously", async () => { + debounceTime.set(() => 20) + + heartbeat("test-heartbeat", { continuous: true }) + + await sleep(60) + await scheduler.shutdown() + + expect(requests.length).toBeGreaterThanOrEqual(2) + + for (const request of requests) { + expectEvents(request, [ + { + identifier: "test-heartbeat", + check_in_type: "heartbeat" + } + ]) + } + }) + }) + }) + describe("Appsignal.heartbeat (deprecated)", () => { beforeEach(() => { heartbeatHelperWarnOnce.reset() }) it("behaves like Appsignal.checkIn.cron", async () => { - expect(heartbeat("test-cron-checkin")).toBeUndefined() + expect(deprecatedHeartbeat("test-cron-checkin")).toBeUndefined() await scheduler.shutdown() @@ -513,7 +562,7 @@ describe("checkIn.Cron", () => { .spyOn(Client.internalLogger, "warn") .mockImplementation() - expect(heartbeat("test-cron-checkin")).toBeUndefined() + expect(deprecatedHeartbeat("test-cron-checkin")).toBeUndefined() for (const spy of [consoleWarnSpy, internalLoggerWarnSpy]) { expect(spy.mock.calls).toHaveLength(1) @@ -529,7 +578,7 @@ describe("checkIn.Cron", () => { .spyOn(Client.internalLogger, "warn") .mockImplementation() - expect(heartbeat("test-cron-checkin")).toBeUndefined() + expect(deprecatedHeartbeat("test-cron-checkin")).toBeUndefined() for (const spy of [consoleWarnSpy, internalLoggerWarnSpy]) { expect(spy.mock.calls).toHaveLength(1) @@ -539,7 +588,7 @@ describe("checkIn.Cron", () => { spy.mockClear() } - expect(heartbeat("test-cron-checkin")).toBeUndefined() + expect(deprecatedHeartbeat("test-cron-checkin")).toBeUndefined() for (const spy of [consoleWarnSpy, internalLoggerWarnSpy]) { expect(spy.mock.calls).toHaveLength(0) diff --git a/src/__tests__/check_in/event.test.ts b/src/__tests__/check_in/event.test.ts new file mode 100644 index 00000000..ca7f43a3 --- /dev/null +++ b/src/__tests__/check_in/event.test.ts @@ -0,0 +1,198 @@ +import { Event, EventCheckInType } from "../../check_in/event" + +describe("Event", () => { + describe(".describe()", () => { + it("describes an empty list of events", () => { + expect(Event.describe([])).toBe("no check-in events") + }) + + it("describes a list of many events by count", () => { + const events = [ + new Event({ identifier: "1", check_in_type: "cron" }), + new Event({ identifier: "2", check_in_type: "cron" }) + ] + expect(Event.describe(events)).toBe("2 check-in events") + }) + + it("describes a single cron event", () => { + const event = new Event({ + identifier: "identifier", + check_in_type: "cron", + kind: "start", + digest: "some-digest" + }) + expect(Event.describe([event])).toBe( + "cron check-in `identifier` start event (digest some-digest)" + ) + }) + + it("describes a single heartbeat event", () => { + const event = new Event({ + identifier: "identifier", + check_in_type: "heartbeat" + }) + expect(Event.describe([event])).toBe( + "heartbeat check-in `identifier` event" + ) + }) + + it("describes a single unknown event", () => { + const event = new Event({ + identifier: "identifier", + check_in_type: "unknown" as EventCheckInType + }) + expect(Event.describe([event])).toBe("unknown check-in event") + }) + }) + + describe(".isRedundant()", () => { + it("returns false if the events have different types", () => { + const event = new Event({ identifier: "1", check_in_type: "cron" }) + const newEvent = new Event({ + identifier: "1", + check_in_type: "heartbeat" + }) + + expect(event.isRedundant(newEvent)).toBe(false) + }) + + it("returns false if the events have an unknown type", () => { + const event = new Event({ + identifier: "1", + check_in_type: "unknown" as EventCheckInType + }) + const newEvent = new Event({ + identifier: "1", + check_in_type: "unknown" as EventCheckInType + }) + + expect(event.isRedundant(newEvent)).toBe(false) + }) + + it("returns false if heartbeat events have different identifiers", () => { + const event = new Event({ identifier: "1", check_in_type: "heartbeat" }) + const newEvent = new Event({ + identifier: "2", + check_in_type: "heartbeat" + }) + + expect(event.isRedundant(newEvent)).toBe(false) + }) + + it("returns true if heartbeat events have the same identifier", () => { + const event = new Event({ identifier: "1", check_in_type: "heartbeat" }) + const newEvent = new Event({ + identifier: "1", + check_in_type: "heartbeat" + }) + + expect(event.isRedundant(newEvent)).toBe(true) + }) + + it("returns false if cron events have different identifiers", () => { + const event = new Event({ identifier: "1", check_in_type: "cron" }) + const newEvent = new Event({ identifier: "2", check_in_type: "cron" }) + + expect(event.isRedundant(newEvent)).toBe(false) + }) + + it("returns false if cron events have different digests", () => { + const event = new Event({ + identifier: "1", + digest: "1", + check_in_type: "cron" + }) + const newEvent = new Event({ + identifier: "1", + digest: "2", + check_in_type: "cron" + }) + + expect(event.isRedundant(newEvent)).toBe(false) + }) + + it("returns false if cron events have different kinds", () => { + const event = new Event({ + identifier: "1", + kind: "start", + check_in_type: "cron" + }) + const newEvent = new Event({ + identifier: "1", + kind: "finish", + check_in_type: "cron" + }) + + expect(event.isRedundant(newEvent)).toBe(false) + }) + + it("returns true if cron events have the same identifier, digest, and kind", () => { + const event = new Event({ + identifier: "1", + digest: "1", + kind: "start", + check_in_type: "cron" + }) + const newEvent = new Event({ + identifier: "1", + digest: "1", + kind: "start", + check_in_type: "cron" + }) + + expect(event.isRedundant(newEvent)).toBe(true) + }) + }) + + describe("JSON serialization", () => { + it("serializes a heartbeat event", () => { + const event = new Event({ + identifier: "1", + check_in_type: "heartbeat" + }) + + const serialised = JSON.stringify(event) + + expect(serialised).toMatch('"identifier":"1"') + expect(serialised).toMatch('"check_in_type":"heartbeat"') + expect(serialised).toMatch('"timestamp":') + expect(serialised).not.toMatch('"digest":') + expect(serialised).not.toMatch('"kind":') + + const parsed = JSON.parse(serialised) + + expect(parsed).toEqual({ + identifier: "1", + check_in_type: "heartbeat", + timestamp: expect.any(Number) + }) + }) + + it("serializes a cron event", () => { + const event = new Event({ + identifier: "1", + check_in_type: "cron", + kind: "start", + digest: "digest" + }) + + const serialised = JSON.stringify(event) + + expect(serialised).toMatch('"identifier":"1"') + expect(serialised).toMatch('"check_in_type":"cron"') + expect(serialised).toMatch('"kind":"start"') + expect(serialised).toMatch('"digest":"digest"') + expect(serialised).toMatch('"timestamp":') + + const parsed = JSON.parse(serialised) + + expect(parsed).toEqual({ + identifier: "1", + check_in_type: "cron", + kind: "start", + digest: "digest", + timestamp: expect.any(Number) + }) + }) + }) +}) diff --git a/src/check_in/cron.ts b/src/check_in/cron.ts index f0499c17..229bd136 100644 --- a/src/check_in/cron.ts +++ b/src/check_in/cron.ts @@ -1,15 +1,6 @@ import { scheduler } from "./scheduler" import crypto from "crypto" - -export type EventKind = "start" | "finish" - -export type Event = { - identifier: string - digest: string - kind: EventKind - timestamp: number - check_in_type: "cron" -} +import { Event } from "./event" export class Cron { identifier: string @@ -21,20 +12,10 @@ export class Cron { } public start(): void { - scheduler.schedule(this.event("start")) + scheduler.schedule(Event.cron(this, "start")) } public finish(): void { - scheduler.schedule(this.event("finish")) - } - - private event(kind: EventKind): Event { - return { - identifier: this.identifier, - digest: this.digest, - kind: kind, - timestamp: Math.floor(Date.now() / 1000), - check_in_type: "cron" - } + scheduler.schedule(Event.cron(this, "finish")) } } diff --git a/src/check_in/event.ts b/src/check_in/event.ts new file mode 100644 index 00000000..2be3a08a --- /dev/null +++ b/src/check_in/event.ts @@ -0,0 +1,101 @@ +import type { Cron } from "./cron" + +export type EventKind = "start" | "finish" +export type EventCheckInType = "cron" | "heartbeat" + +export class Event { + identifier: string + digest?: string + kind?: EventKind + timestamp: number + check_in_type: EventCheckInType + + constructor({ + identifier, + digest, + kind, + check_in_type + }: { + identifier: string + digest?: string + kind?: EventKind + check_in_type: EventCheckInType + }) { + this.identifier = identifier + this.digest = digest + this.kind = kind + this.timestamp = Math.floor(Date.now() / 1000) + this.check_in_type = check_in_type + } + + static cron(cron: Cron, kind: EventKind) { + return new Event({ + identifier: cron.identifier, + digest: cron.digest, + kind, + check_in_type: "cron" + }) + } + + static heartbeat(identifier: string) { + return new Event({ + identifier, + check_in_type: "heartbeat" + }) + } + + isRedundant(newEvent: Event): boolean { + if (this.check_in_type === "cron") { + // Consider any existing cron check-in event redundant if it has the + // same identifier, digest and kind as the one we're adding. + if ( + newEvent.check_in_type === "cron" && + this.identifier === newEvent.identifier && + this.kind === newEvent.kind && + this.digest === newEvent.digest + ) { + return true + } + } else if (this.check_in_type === "heartbeat") { + // Consider any existing heartbeat check-in event redundant if it has + // the same identifier as the one we're adding. + if ( + newEvent.check_in_type === "heartbeat" && + this.identifier === newEvent.identifier + ) { + return true + } + } + + return false + } + + static describe(events: Event[]): string { + const count = events.length + if (count === 0) { + // This shouldn't happen. + return "no check-in events" + } else if (count === 1) { + const event = events[0] + if (event.check_in_type === "cron") { + return ( + "cron check-in `" + + (event.identifier || "unknown") + + "` " + + (event.kind || "unknown") + + " event (digest " + + (event.digest || "unknown") + + ")" + ) + } else if (event.check_in_type === "heartbeat") { + return ( + "heartbeat check-in `" + (event.identifier || "unknown") + "` event" + ) + } else { + return "unknown check-in event" + } + } else { + return `${count} check-in events` + } + } +} diff --git a/src/check_in/heartbeat.ts b/src/check_in/heartbeat.ts new file mode 100644 index 00000000..1cb1297c --- /dev/null +++ b/src/check_in/heartbeat.ts @@ -0,0 +1,18 @@ +import { stubbable } from "../utils" + +const HEARTBEAT_INTERVAL_MILLISECONDS = 30_000 + +export const heartbeatInterval = stubbable( + () => HEARTBEAT_INTERVAL_MILLISECONDS +) + +let continuousHeartbeats: NodeJS.Timeout[] = [] + +export function addContinuousHeartbeat(interval: NodeJS.Timeout) { + continuousHeartbeats.push(interval) +} + +export function killContinuousHeartbeats() { + continuousHeartbeats.forEach(clearInterval) + continuousHeartbeats = [] +} diff --git a/src/check_in/index.ts b/src/check_in/index.ts index 23730e77..19f107d6 100644 --- a/src/check_in/index.ts +++ b/src/check_in/index.ts @@ -1,6 +1,11 @@ +import { scheduler } from "./scheduler" + import { Cron } from "./cron" export { Cron } -export type { EventKind, Event } from "./cron" + +import { Event } from "./event" + +import { heartbeatInterval, addContinuousHeartbeat } from "./heartbeat" export function cron(identifier: string): void export function cron(identifier: string, fn: () => T): T @@ -21,3 +26,16 @@ export function cron(identifier: string, fn?: () => T): T | undefined { return output } + +export function heartbeat( + identifier: string, + options?: { continuous: boolean } +): void { + if (options?.continuous) { + addContinuousHeartbeat( + setInterval(heartbeat, heartbeatInterval(), identifier).unref() + ) + } + + scheduler.schedule(Event.heartbeat(identifier)) +} diff --git a/src/check_in/scheduler.ts b/src/check_in/scheduler.ts index 34118ab8..9ed817ba 100644 --- a/src/check_in/scheduler.ts +++ b/src/check_in/scheduler.ts @@ -1,33 +1,25 @@ -import type { Event } from "./cron" +import { Event } from "./event" import { Transmitter } from "../transmitter" import { Client } from "../client" -import { ndjsonStringify } from "../utils" +import { ndjsonStringify, stubbable } from "../utils" const INITIAL_DEBOUNCE_MILLISECONDS = 100 const BETWEEN_TRANSMISSIONS_DEBOUNCE_MILLISECONDS = 10_000 -const originalDebounceTime = (lastTransmission: number | undefined): number => { - if (lastTransmission === undefined) { - return INITIAL_DEBOUNCE_MILLISECONDS - } - - const elapsed = Date.now() - lastTransmission - - return Math.max( - INITIAL_DEBOUNCE_MILLISECONDS, - BETWEEN_TRANSMISSIONS_DEBOUNCE_MILLISECONDS - elapsed - ) -} - -export let debounceTime: typeof originalDebounceTime = originalDebounceTime +export const debounceTime = stubbable( + (lastTransmission: number | undefined): number => { + if (lastTransmission === undefined) { + return INITIAL_DEBOUNCE_MILLISECONDS + } -export function setDebounceTime(fn: typeof originalDebounceTime) { - debounceTime = fn -} + const elapsed = Date.now() - lastTransmission -export function resetDebounceTime() { - debounceTime = originalDebounceTime -} + return Math.max( + INITIAL_DEBOUNCE_MILLISECONDS, + BETWEEN_TRANSMISSIONS_DEBOUNCE_MILLISECONDS - elapsed + ) + } +) class PendingPromiseSet extends Set> { add(promise: Promise) { @@ -61,19 +53,19 @@ export class Scheduler { schedule(event: Event) { if (Client.client === undefined || !Client.client.isActive) { Client.internalLogger.debug( - `Cannot schedule ${this.describe([event])}: AppSignal is not active` + `Cannot schedule ${Event.describe([event])}: AppSignal is not active` ) return } if (this.isShuttingDown) { Client.internalLogger.debug( - `Cannot schedule ${this.describe([event])}: AppSignal is stopped` + `Cannot schedule ${Event.describe([event])}: AppSignal is stopped` ) return } - Client.internalLogger.trace(`Scheduling ${this.describe([event])}`) + Client.internalLogger.trace(`Scheduling ${Event.describe([event])}`) this.addEvent(event) this.scheduleWaker() @@ -83,31 +75,17 @@ export class Scheduler { // Remove redundant events, keeping the newly added one, which // should be the one with the most recent timestamp. this.events = this.events.filter(existingEvent => { - return !this.isRedundantEvent(existingEvent, event) - }) - this.events.push(event) - } - - private isRedundantEvent(existingEvent: Event, newEvent: Event): boolean { - let isRedundant = false - - if (newEvent.check_in_type === "cron") { - // Consider any existing cron check-in event redundant if it has the - // same identifier, digest and kind as the one we're adding. - isRedundant = - existingEvent.identifier === newEvent.identifier && - existingEvent.kind === newEvent.kind && - existingEvent.digest === newEvent.digest && - existingEvent.check_in_type === "cron" - } + const isRedundant = existingEvent.isRedundant(event) - if (isRedundant) { - Client.internalLogger.debug( - `Replacing previously scheduled ${this.describe([existingEvent])}` - ) - } + if (isRedundant) { + Client.internalLogger.debug( + `Replacing previously scheduled ${Event.describe([existingEvent])}` + ) + } - return isRedundant + return !isRedundant + }) + this.events.push(event) } private scheduleWaker() { @@ -139,7 +117,7 @@ export class Scheduler { } private transmit(events: Event[]) { - const description = this.describe(events) + const description = Event.describe(events) const promise = new Transmitter( `${Client.config.data.loggingEndpoint}/check_ins/json`, @@ -166,31 +144,6 @@ export class Scheduler { this.pendingTransmissions.add(handledPromise) } - - private describe(events: Event[]): string { - const count = events.length - if (count === 0) { - // This shouldn't happen. - return "no check-in events" - } else if (count === 1) { - const event = events[0] - if (event.check_in_type === "cron") { - return ( - "cron check-in `" + - (event.identifier || "unknown") + - "` " + - (event.kind || "unknown") + - " event (digest " + - (event.digest || "unknown") + - ")" - ) - } else { - return "unknown check-in event" - } - } else { - return `${count} check-in events` - } - } } export let scheduler = new Scheduler() diff --git a/src/heartbeat.ts b/src/heartbeat.ts index dca32450..932f19e6 100644 --- a/src/heartbeat.ts +++ b/src/heartbeat.ts @@ -1,6 +1,6 @@ import { Client } from "./client" import { cron, Cron } from "./check_in" -export type { Event, EventKind } from "./check_in" +export type { Event, EventKind } from "./check_in/event" type OnceFn = { (): void diff --git a/src/utils.ts b/src/utils.ts index 34f4aefc..511ac570 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -67,3 +67,33 @@ export function ndjsonStringify(elements: any[]): string { export function ndjsonParse(data: string): any[] { return data.split("\n").map(line => JSON.parse(line)) } + +export type Stub any> = F & { + set: (fn: F) => void + reset: () => void +} + +export function stubbable( + original: (...args: T) => U +): Stub<(...args: T) => U> { + let current: (...args: T) => U = original + + const stub = { + [original.name]: function (...args: T): U { + return current(...args) + } + }[original.name] as Stub<(...args: T) => U> + + function set(fn: (...args: T) => U) { + current = fn + } + + function reset() { + current = original + } + + stub.set = set + stub.reset = reset + + return stub +}