Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE:: Support for metrics collection - Prometheus and Open Telemetry #3420

Merged
merged 5 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
639 changes: 639 additions & 0 deletions metrics/grafana/grafana.dashboard.app.json.txt

Large diffs are not rendered by default.

1,017 changes: 1,017 additions & 0 deletions metrics/grafana/grafana.dashboard.server.json.txt

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions metrics/otel/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: "2"
services:
otel-collector:
image: otel/opentelemetry-collector-contrib
command: ["--config=/etc/otelcol-contrib/config.yaml", "--feature-gates=-exporter.datadogexporter.DisableAPMStats", "${OTELCOL_ARGS}"]
volumes:
- ./otel.config.yml:/etc/otelcol-contrib/config.yaml
ports:
- 1888:1888 # pprof extension
- 8888:8888 # Prometheus metrics exposed by the Collector
- 8889:8889 # Prometheus exporter metrics
- 13133:13133 # health_check extension
- 4317:4317 # OTLP gRPC receiver
- 4318:4318 # OTLP http receiver
- 55679:55679 # zpages extension
72 changes: 72 additions & 0 deletions metrics/otel/otel.config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
grpc:
endpoint: 0.0.0.0:4317
# The hostmetrics receiver is required to get correct infrastructure metrics in Datadog.
hostmetrics:
collection_interval: 10s
scrapers:
paging:
metrics:
system.paging.utilization:
enabled: true
cpu:
metrics:
system.cpu.utilization:
enabled: true
disk:
filesystem:
metrics:
system.filesystem.utilization:
enabled: true
load:
memory:
network:

# The prometheus receiver scrapes metrics needed for the OpenTelemetry Collector Dashboard.
prometheus:
config:
scrape_configs:
- job_name: 'otelcol'
scrape_interval: 10s
static_configs:
- targets: ['0.0.0.0:8888']

filelog:
include_file_path: true
poll_interval: 10s
include:
- /var/log/**/*example*/*.log

processors:
batch:
send_batch_max_size: 100
send_batch_size: 10
timeout: 10s

connectors:
datadog/connector:

exporters:
datadog/exporter:
api:
site: us5.datadoghq.com
key: "4f5778c6cf5381df89606004b4cacbc9"

service:
pipelines:
metrics:
receivers: [datadog/connector, hostmetrics, prometheus, otlp]
processors: [batch]
exporters: [datadog/exporter]
traces:
receivers: [otlp]
processors: [batch]
exporters: [datadog/connector, datadog/exporter]
logs:
receivers: [otlp, filelog]
processors: [batch]
exporters: [datadog/exporter]
9 changes: 9 additions & 0 deletions metrics/prometheus/prometheus.config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
global:
scrape_interval: 5s
scrape_configs:
- job_name: "FlowiseAI"
static_configs:
- targets: ["localhost:8080","localhost:3000"]

metrics_path: /api/v1/metrics/
scheme: http
13 changes: 13 additions & 0 deletions packages/server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,16 @@ PORT=3000

# APIKEY_STORAGE_TYPE=json (json | db)
# SHOW_COMMUNITY_NODES=true

######################
# METRICS COLLECTION
#######################
ENABLE_METRICS=false
vinodkiran marked this conversation as resolved.
Show resolved Hide resolved
METRICS_PROVIDER=prometheus # prometheus | open_telemetry
METRICS_INCLUDE_NODE_METRICS="false" # default is true
METRICS_SERVICE_NAME="FlowiseAI" # default is 'FlowiseAI'

# ONLY NEEDED if METRICS_PROVIDER=open_telemetry
METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT="http://localhost:4318/v1/metrics"
METRICS_OPEN_TELEMETRY_PROTOCOL="http" # http | grpc | proto (default is http)
METRICS_OPEN_TELEMETRY_DEBUG="false" # default is false
15 changes: 15 additions & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@
"license": "SEE LICENSE IN LICENSE.md",
"dependencies": {
"@oclif/core": "^1.13.10",
"@opentelemetry/api": "^1.3.0",
"@opentelemetry/core": "1.27.0",
"@opentelemetry/exporter-metrics-otlp-grpc": "0.54.0",
"@opentelemetry/exporter-metrics-otlp-http": "0.54.0",
"@opentelemetry/exporter-metrics-otlp-proto": "0.54.0",
"@opentelemetry/exporter-trace-otlp-grpc": "0.54.0",
"@opentelemetry/exporter-trace-otlp-http": "0.54.0",
"@opentelemetry/exporter-trace-otlp-proto": "0.54.0",
"@opentelemetry/resources": "1.27.0",
"@opentelemetry/sdk-metrics": "1.27.0",
"@opentelemetry/sdk-trace-base": "1.27.0",
"@opentelemetry/semantic-conventions": "1.27.0",
"@opentelemetry/auto-instrumentations-node": "^0.52.0",
"@opentelemetry/sdk-node": "^0.54.0",
"@types/lodash": "^4.14.202",
"@types/uuid": "^9.0.7",
"async-mutex": "^0.4.0",
Expand All @@ -79,6 +93,7 @@
"openai": "^4.51.0",
"pg": "^8.11.1",
"posthog-node": "^3.5.0",
"prom-client": "^15.1.3",
"reflect-metadata": "^0.1.13",
"sanitize-html": "^2.11.0",
"socket.io": "^4.6.1",
Expand Down
24 changes: 24 additions & 0 deletions packages/server/src/Interface.Metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
export interface IMetricsProvider {
getName(): string
initializeCounters(): void
setupMetricsEndpoint(): void
incrementCounter(counter: FLOWISE_COUNTER, payload: any): void
}

export enum FLOWISE_COUNTER_STATUS {
SUCCESS = 'success',
FAILURE = 'failure'
}

export enum FLOWISE_COUNTER {
CHATFLOW_CREATED = 'chatflow_created',
AGENTFLOW_CREATED = 'agentflow_created',
ASSISTANT_CREATED = 'assistant_created',
TOOL_CREATED = 'tool_created',

CHATFLOW_PREDICTION_INTERNAL = 'chatflow_prediction_internal',
CHATFLOW_PREDICTION_EXTERNAL = 'chatflow_prediction_external',

AGENTFLOW_PREDICTION_INTERNAL = 'agentflow_prediction_internal',
AGENTFLOW_PREDICTION_EXTERNAL = 'agentflow_prediction_external'
}
29 changes: 28 additions & 1 deletion packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import flowiseApiV1Router from './routes'
import errorHandlerMiddleware from './middlewares/errors'
import { SSEStreamer } from './utils/SSEStreamer'
import { validateAPIKey } from './utils/validateKey'
import { IMetricsProvider } from './Interface.Metrics'
import { Prometheus } from './metrics/Prometheus'
import { OpenTelemetry } from './metrics/OpenTelemetry'

declare global {
namespace Express {
Expand All @@ -39,6 +42,7 @@ export class App {
telemetry: Telemetry
AppDataSource: DataSource = getDataSource()
sseStreamer: SSEStreamer
metricsProvider: IMetricsProvider

constructor() {
this.app = express()
Expand Down Expand Up @@ -137,7 +141,8 @@ export class App {
'/api/v1/ip',
'/api/v1/ping',
'/api/v1/version',
'/api/v1/attachments'
'/api/v1/attachments',
'/api/v1/metrics'
]
const URL_CASE_INSENSITIVE_REGEX: RegExp = /\/api\/v1\//i
const URL_CASE_SENSITIVE_REGEX: RegExp = /\/api\/v1\//
Expand Down Expand Up @@ -203,6 +208,28 @@ export class App {
})
}

if (process.env.ENABLE_METRICS === 'true') {
switch (process.env.METRICS_PROVIDER) {
// default to prometheus
case 'prometheus':
case undefined:
this.metricsProvider = new Prometheus(this.app)
break
case 'open_telemetry':
this.metricsProvider = new OpenTelemetry(this.app)
break
// add more cases for other metrics providers here
}
if (this.metricsProvider) {
await this.metricsProvider.initializeCounters()
logger.info(`📊 [server]: Metrics Provider [${this.metricsProvider.getName()}] has been initialized!`)
} else {
logger.error(
"❌ [server]: Metrics collection is enabled, but failed to initialize provider (valid values are 'prometheus' or 'open_telemetry'."
)
}
}

this.app.use('/api/v1', flowiseApiV1Router)
this.sseStreamer = new SSEStreamer(this.app)

Expand Down
157 changes: 157 additions & 0 deletions packages/server/src/metrics/OpenTelemetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { FLOWISE_COUNTER, IMetricsProvider } from '../Interface.Metrics'
import { Resource } from '@opentelemetry/resources'
import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic-conventions'
import { MeterProvider, PeriodicExportingMetricReader, Histogram } from '@opentelemetry/sdk-metrics'
import { diag, DiagLogLevel, DiagConsoleLogger, Attributes, Counter } from '@opentelemetry/api'
import { getVersion } from 'flowise-components'
import express from 'express'

export class OpenTelemetry implements IMetricsProvider {
private app: express.Application
private resource: Resource
private otlpMetricExporter: any
// private otlpTraceExporter: any
// private tracerProvider: NodeTracerProvider
private metricReader: PeriodicExportingMetricReader
private meterProvider: MeterProvider

// Map to hold all counters and histograms
private counters = new Map<string, Counter | Histogram>()
private httpRequestCounter: Counter
private httpRequestDuration: any

constructor(app: express.Application) {
this.app = app

if (!process.env.METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT) {
throw new Error('METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT is not defined')
}

if (process.env.METRICS_OPEN_TELEMETRY_DEBUG === 'true') {
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG)
}
}

public getName(): string {
return 'OpenTelemetry'
}

async initializeCounters(): Promise<void> {
// Define the resource with the service name for trace grouping
const flowiseVersion = await getVersion()

this.resource = new Resource({
[ATTR_SERVICE_NAME]: process.env.METRICS_SERVICE_NAME || 'FlowiseAI',
[ATTR_SERVICE_VERSION]: flowiseVersion.version // Version as a label
})

const metricProtocol = process.env.METRICS_OPEN_TELEMETRY_PROTOCOL || 'http' // Default to 'http'
// Conditionally import the correct OTLP exporters based on protocol
let OTLPMetricExporter
if (metricProtocol === 'http') {
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-http').OTLPMetricExporter
} else if (metricProtocol === 'grpc') {
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-grpc').OTLPMetricExporter
} else if (metricProtocol === 'proto') {
OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-proto').OTLPMetricExporter
} else {
console.error('Invalid METRICS_OPEN_TELEMETRY_PROTOCOL specified. Please set it to "http", "grpc", or "proto".')
process.exit(1) // Exit if invalid protocol type is specified
}

this.otlpMetricExporter = new OTLPMetricExporter({
url: process.env.METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT // OTLP endpoint for metrics
})

this.metricReader = new PeriodicExportingMetricReader({
exporter: this.otlpMetricExporter,
exportIntervalMillis: 5000 // Export metrics every 5 seconds
})
this.meterProvider = new MeterProvider({ resource: this.resource, readers: [this.metricReader] })

const meter = this.meterProvider.getMeter('flowise-metrics')
// look at the FLOWISE_COUNTER enum in Interface.Metrics.ts and get all values
// for each counter in the enum, create a new promClient.Counter and add it to the registry
const enumEntries = Object.entries(FLOWISE_COUNTER)
enumEntries.forEach(([name, value]) => {
// derive proper counter name from the enum value (chatflow_created = Chatflow Created)
const properCounterName: string = name.replace(/_/g, ' ').replace(/\b\w/g, (l) => l.toUpperCase())
this.counters.set(
value,
meter.createCounter(value, {
description: properCounterName
})
)
})

// in addition to the enum counters, add a few more custom counters

const versionGuage = meter.createGauge('flowise_version', {
description: 'Flowise version'
})
// remove the last dot from the version string, e.g. 2.1.3 -> 2.13 (guage needs a number - float)
const formattedVersion = flowiseVersion.version.replace(/\.(\d+)$/, '$1')
versionGuage.record(parseFloat(formattedVersion))

// Counter for HTTP requests with method, path, and status as labels
this.httpRequestCounter = meter.createCounter('http_requests_total', {
description: 'Counts the number of HTTP requests received'
})

// Histogram to measure HTTP request duration in milliseconds
this.httpRequestDuration = meter.createHistogram('http_request_duration_ms', {
description: 'Records the duration of HTTP requests in ms'
})
}

// Function to record HTTP request duration
private recordHttpRequestDuration(durationMs: number, method: string, path: string, status: number) {
this.httpRequestDuration.record(durationMs, {
method,
path,
status: status.toString()
})
}

// Function to record HTTP requests with specific labels
private recordHttpRequest(method: string, path: string, status: number) {
this.httpRequestCounter.add(1, {
method,
path,
status: status.toString()
})
}

async setupMetricsEndpoint(): Promise<void> {
// Graceful shutdown for telemetry data flushing
process.on('SIGTERM', async () => {
await this.metricReader.shutdown()
await this.meterProvider.shutdown()
})

// Runs before each requests
this.app.use((req, res, next) => {
res.locals.startEpoch = Date.now()
next()
})

// Runs after each requests
this.app.use((req, res, next) => {
res.on('finish', async () => {
if (res.locals.startEpoch) {
const responseTimeInMs = Date.now() - res.locals.startEpoch
this.recordHttpRequest(req.method, req.path, res.statusCode)
this.recordHttpRequestDuration(responseTimeInMs, req.method, req.path, res.statusCode)
}
})
next()
})
}

async incrementCounter(counter: string, payload: any): Promise<void> {
// Increment OpenTelemetry counter with the payload
if (this.counters.has(counter)) {
;(this.counters.get(counter) as Counter<Attributes>).add(1, payload)
}
}
}
Loading