diff --git a/.chloggen/postgres-stored-procedure-measurements.yaml b/.chloggen/postgres-stored-procedure-measurements.yaml new file mode 100644 index 0000000000000..8534175e5df41 --- /dev/null +++ b/.chloggen/postgres-stored-procedure-measurements.yaml @@ -0,0 +1,32 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/postgresql + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for collecting stored procedure execution statistics in PostgreSQL receiver" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [0] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + + - Introduced new configuration options to enable stored procedure metrics collection. + - Implemented SQL queries to gather execution statistics from PostgreSQL system catalogs. + - Added unit and integration tests to validate the new functionality. + - Merged changes from HEAD + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/postgresqlreceiver/client.go b/receiver/postgresqlreceiver/client.go index 085672dded4a1..d122278d31210 100644 --- a/receiver/postgresqlreceiver/client.go +++ b/receiver/postgresqlreceiver/client.go @@ -972,6 +972,8 @@ func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, new currentAttributes[string(semconv.DBNamespaceKey)] = row[querySampleColumnDatname] currentAttributes[string(semconv.UserNameKey)] = row[querySampleColumnUsename] currentAttributes[postgresqlTotalExecTimeAttributeName] = duration + currentAttributes[dbAttributePrefix+querySampleColumnProcedureID] = row[querySampleColumnProcedureID] + currentAttributes[dbAttributePrefix+querySampleColumnProcedureName] = row[querySampleColumnProcedureName] finalAttributes = append(finalAttributes, currentAttributes) } diff --git a/receiver/postgresqlreceiver/consts.go b/receiver/postgresqlreceiver/consts.go index 4ce27dc8c8443..973a32c876097 100644 --- a/receiver/postgresqlreceiver/consts.go +++ b/receiver/postgresqlreceiver/consts.go @@ -35,6 +35,8 @@ const ( querySampleColumnUsename = "usename" querySampleColumnWaitEvent = "wait_event" querySampleColumnWaitEventType = "wait_event_type" + querySampleColumnProcedureID = "procedure_id" + querySampleColumnProcedureName = "procedure_name" ) const ( diff --git a/receiver/postgresqlreceiver/documentation.md b/receiver/postgresqlreceiver/documentation.md index e3fd6f819881a..a9668202f58e1 100644 --- a/receiver/postgresqlreceiver/documentation.md +++ b/receiver/postgresqlreceiver/documentation.md @@ -422,6 +422,8 @@ query sample | postgresql.wait_event_type | The type of event for which the backend is waiting, if any; otherwise NULL. | Any Str | | postgresql.query_id | Identifier of this backend's most recent query. If state is active this field shows the identifier of the currently executing query. In all other states, it shows the identifier of last query that was executed. | Any Str | | postgresql.total_exec_time | Total time spent executing the statement, in delta milliseconds. | Any Double | +| postgresql.procedure_id | Identifier of the stored procedure. | Any Str | +| postgresql.procedure_name | Name of the stored procedure. | Any Str | ### db.server.top_query diff --git a/receiver/postgresqlreceiver/integration_test.go b/receiver/postgresqlreceiver/integration_test.go index cb5364b2502ef..96a57d8e36ab9 100644 --- a/receiver/postgresqlreceiver/integration_test.go +++ b/receiver/postgresqlreceiver/integration_test.go @@ -11,6 +11,7 @@ import ( "net" "path/filepath" "strings" + "sync" "testing" "time" @@ -188,19 +189,16 @@ func TestScrapeLogsFromContainer(t *testing.T) { WithOccurrence(1), }, }) + defer testcontainers.CleanupContainer(t, ci) assert.NoError(t, err) err = ci.Start(t.Context()) assert.NoError(t, err) - defer testcontainers.CleanupContainer(t, ci) p, err := ci.MappedPort(t.Context(), postgresqlPort) assert.NoError(t, err) connStr := fmt.Sprintf("postgres://root:otel@localhost:%s/otel2?sslmode=disable", p.Port()) db, err := sql.Open("postgres", connStr) assert.NoError(t, err) - - _, err = db.Exec("Select * from test2 where id = 67") - assert.NoError(t, err) defer db.Close() cfg := Config{ @@ -225,6 +223,11 @@ func TestScrapeLogsFromContainer(t *testing.T) { Logger: zap.Must(zap.NewProduction()), }, }, &cfg, clientFactory, newCache(1), newTTLCache[string](1000, time.Second)) + // run an example query in a separate thread so that it is picked up by the log scraper + // Note that it is ASSUMED that the scraper will run within 1 second of this query being run, but it is not + // guaranteed, so adjustments may be appropriate later + waitGroup := sync.WaitGroup{} + go testQuery(t, db, "Select * from test2 where id = 67; select pg_sleep(1);", &waitGroup) plogs, err := ns.scrapeQuerySamples(t.Context(), 30) assert.NoError(t, err) logRecords := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() @@ -237,7 +240,7 @@ func TestScrapeLogsFromContainer(t *testing.T) { if !strings.HasPrefix(query, "select * from test2") { continue } - assert.Equal(t, "select * from test2 where id = ?", query) + assert.Equal(t, "select * from test2 where id = ? select pg_sleep ( ? )", query) databaseAttribute, ok := attributes["db.namespace"] assert.True(t, ok) assert.Equal(t, "otel2", databaseAttribute.(string)) @@ -294,5 +297,15 @@ func TestScrapeLogsFromContainer(t *testing.T) { assert.Equal(t, int64(2), calls.(int64)) found = true } + assert.True(t, found, "Expected to find a log record with the query text from the first time top query") + // wait for the query goroutine to finish + waitGroup.Wait() +} + +func testQuery(t *testing.T, db *sql.DB, query string, waitGroup *sync.WaitGroup) { + waitGroup.Add(1) + _, err := db.Exec(query) + assert.NoError(t, err) + waitGroup.Done() } diff --git a/receiver/postgresqlreceiver/internal/metadata/generated_logs.go b/receiver/postgresqlreceiver/internal/metadata/generated_logs.go index c8b5859781a6b..2e4c63ca95e2e 100644 --- a/receiver/postgresqlreceiver/internal/metadata/generated_logs.go +++ b/receiver/postgresqlreceiver/internal/metadata/generated_logs.go @@ -18,7 +18,7 @@ type eventDbServerQuerySample struct { config EventConfig // event config provided by user. } -func (e *eventDbServerQuerySample) recordEvent(ctx context.Context, timestamp pcommon.Timestamp, dbSystemNameAttributeValue string, dbNamespaceAttributeValue string, dbQueryTextAttributeValue string, userNameAttributeValue string, postgresqlStateAttributeValue string, postgresqlPidAttributeValue int64, postgresqlApplicationNameAttributeValue string, networkPeerAddressAttributeValue string, networkPeerPortAttributeValue int64, postgresqlClientHostnameAttributeValue string, postgresqlQueryStartAttributeValue string, postgresqlWaitEventAttributeValue string, postgresqlWaitEventTypeAttributeValue string, postgresqlQueryIDAttributeValue string, postgresqlTotalExecTimeAttributeValue float64) { +func (e *eventDbServerQuerySample) recordEvent(ctx context.Context, timestamp pcommon.Timestamp, dbSystemNameAttributeValue string, dbNamespaceAttributeValue string, dbQueryTextAttributeValue string, userNameAttributeValue string, postgresqlStateAttributeValue string, postgresqlPidAttributeValue int64, postgresqlApplicationNameAttributeValue string, networkPeerAddressAttributeValue string, networkPeerPortAttributeValue int64, postgresqlClientHostnameAttributeValue string, postgresqlQueryStartAttributeValue string, postgresqlWaitEventAttributeValue string, postgresqlWaitEventTypeAttributeValue string, postgresqlQueryIDAttributeValue string, postgresqlTotalExecTimeAttributeValue float64, postgresqlProcedureIDAttributeValue string, postgresqlProcedureNameAttributeValue string) { if !e.config.Enabled { return } @@ -45,6 +45,8 @@ func (e *eventDbServerQuerySample) recordEvent(ctx context.Context, timestamp pc dp.Attributes().PutStr("postgresql.wait_event_type", postgresqlWaitEventTypeAttributeValue) dp.Attributes().PutStr("postgresql.query_id", postgresqlQueryIDAttributeValue) dp.Attributes().PutDouble("postgresql.total_exec_time", postgresqlTotalExecTimeAttributeValue) + dp.Attributes().PutStr("postgresql.procedure_id", postgresqlProcedureIDAttributeValue) + dp.Attributes().PutStr("postgresql.procedure_name", postgresqlProcedureNameAttributeValue) } @@ -255,8 +257,8 @@ func (lb *LogsBuilder) Emit(options ...ResourceLogsOption) plog.Logs { } // RecordDbServerQuerySampleEvent adds a log record of db.server.query_sample event. -func (lb *LogsBuilder) RecordDbServerQuerySampleEvent(ctx context.Context, timestamp pcommon.Timestamp, dbSystemNameAttributeValue AttributeDbSystemName, dbNamespaceAttributeValue string, dbQueryTextAttributeValue string, userNameAttributeValue string, postgresqlStateAttributeValue string, postgresqlPidAttributeValue int64, postgresqlApplicationNameAttributeValue string, networkPeerAddressAttributeValue string, networkPeerPortAttributeValue int64, postgresqlClientHostnameAttributeValue string, postgresqlQueryStartAttributeValue string, postgresqlWaitEventAttributeValue string, postgresqlWaitEventTypeAttributeValue string, postgresqlQueryIDAttributeValue string, postgresqlTotalExecTimeAttributeValue float64) { - lb.eventDbServerQuerySample.recordEvent(ctx, timestamp, dbSystemNameAttributeValue.String(), dbNamespaceAttributeValue, dbQueryTextAttributeValue, userNameAttributeValue, postgresqlStateAttributeValue, postgresqlPidAttributeValue, postgresqlApplicationNameAttributeValue, networkPeerAddressAttributeValue, networkPeerPortAttributeValue, postgresqlClientHostnameAttributeValue, postgresqlQueryStartAttributeValue, postgresqlWaitEventAttributeValue, postgresqlWaitEventTypeAttributeValue, postgresqlQueryIDAttributeValue, postgresqlTotalExecTimeAttributeValue) +func (lb *LogsBuilder) RecordDbServerQuerySampleEvent(ctx context.Context, timestamp pcommon.Timestamp, dbSystemNameAttributeValue AttributeDbSystemName, dbNamespaceAttributeValue string, dbQueryTextAttributeValue string, userNameAttributeValue string, postgresqlStateAttributeValue string, postgresqlPidAttributeValue int64, postgresqlApplicationNameAttributeValue string, networkPeerAddressAttributeValue string, networkPeerPortAttributeValue int64, postgresqlClientHostnameAttributeValue string, postgresqlQueryStartAttributeValue string, postgresqlWaitEventAttributeValue string, postgresqlWaitEventTypeAttributeValue string, postgresqlQueryIDAttributeValue string, postgresqlTotalExecTimeAttributeValue float64, postgresqlProcedureIDAttributeValue string, postgresqlProcedureNameAttributeValue string) { + lb.eventDbServerQuerySample.recordEvent(ctx, timestamp, dbSystemNameAttributeValue.String(), dbNamespaceAttributeValue, dbQueryTextAttributeValue, userNameAttributeValue, postgresqlStateAttributeValue, postgresqlPidAttributeValue, postgresqlApplicationNameAttributeValue, networkPeerAddressAttributeValue, networkPeerPortAttributeValue, postgresqlClientHostnameAttributeValue, postgresqlQueryStartAttributeValue, postgresqlWaitEventAttributeValue, postgresqlWaitEventTypeAttributeValue, postgresqlQueryIDAttributeValue, postgresqlTotalExecTimeAttributeValue, postgresqlProcedureIDAttributeValue, postgresqlProcedureNameAttributeValue) } // RecordDbServerTopQueryEvent adds a log record of db.server.top_query event. diff --git a/receiver/postgresqlreceiver/internal/metadata/generated_logs_test.go b/receiver/postgresqlreceiver/internal/metadata/generated_logs_test.go index eb98acb06aca3..de10b14858c7f 100644 --- a/receiver/postgresqlreceiver/internal/metadata/generated_logs_test.go +++ b/receiver/postgresqlreceiver/internal/metadata/generated_logs_test.go @@ -133,7 +133,7 @@ func TestLogsBuilder(t *testing.T) { allEventsCount := 0 defaultEventsCount++ allEventsCount++ - lb.RecordDbServerQuerySampleEvent(ctx, timestamp, AttributeDbSystemNamePostgresql, "db.namespace-val", "db.query.text-val", "user.name-val", "postgresql.state-val", 14, "postgresql.application_name-val", "network.peer.address-val", 17, "postgresql.client_hostname-val", "postgresql.query_start-val", "postgresql.wait_event-val", "postgresql.wait_event_type-val", "postgresql.query_id-val", 26.100000) + lb.RecordDbServerQuerySampleEvent(ctx, timestamp, AttributeDbSystemNamePostgresql, "db.namespace-val", "db.query.text-val", "user.name-val", "postgresql.state-val", 14, "postgresql.application_name-val", "network.peer.address-val", 17, "postgresql.client_hostname-val", "postgresql.query_start-val", "postgresql.wait_event-val", "postgresql.wait_event_type-val", "postgresql.query_id-val", 26.100000, "postgresql.procedure_id-val", "postgresql.procedure_name-val") defaultEventsCount++ allEventsCount++ lb.RecordDbServerTopQueryEvent(ctx, timestamp, AttributeDbSystemNamePostgresql, "db.namespace-val", "db.query.text-val", 16, 15, 30, 26, 27, 30, 25, 28, "postgresql.queryid-val", "postgresql.rolname-val", 26.100000, 26.100000, "postgresql.query_plan-val") @@ -218,6 +218,12 @@ func TestLogsBuilder(t *testing.T) { attrVal, ok = lr.Attributes().Get("postgresql.total_exec_time") assert.True(t, ok) assert.Equal(t, 26.100000, attrVal.Double()) + attrVal, ok = lr.Attributes().Get("postgresql.procedure_id") + assert.True(t, ok) + assert.Equal(t, "postgresql.procedure_id-val", attrVal.Str()) + attrVal, ok = lr.Attributes().Get("postgresql.procedure_name") + assert.True(t, ok) + assert.Equal(t, "postgresql.procedure_name-val", attrVal.Str()) case "db.server.top_query": assert.False(t, validatedEvents["db.server.top_query"], "Found a duplicate in the events slice: db.server.top_query") validatedEvents["db.server.top_query"] = true diff --git a/receiver/postgresqlreceiver/metadata.yaml b/receiver/postgresqlreceiver/metadata.yaml index 2b10bda3985bb..a35acdab001d5 100644 --- a/receiver/postgresqlreceiver/metadata.yaml +++ b/receiver/postgresqlreceiver/metadata.yaml @@ -98,6 +98,12 @@ attributes: postgresql.pid: description: Process ID of this backend. type: int + postgresql.procedure_id: + description: Identifier of the stored procedure. + type: string + postgresql.procedure_name: + description: Name of the stored procedure. + type: string postgresql.query_id: description: Identifier of this backend's most recent query. If state is active this field shows the identifier of the currently executing query. In all other states, it shows the identifier of last query that was executed. type: string @@ -200,6 +206,8 @@ events: - postgresql.wait_event_type - postgresql.query_id - postgresql.total_exec_time + - postgresql.procedure_id + - postgresql.procedure_name db.server.top_query: enabled: true description: top query diff --git a/receiver/postgresqlreceiver/scraper.go b/receiver/postgresqlreceiver/scraper.go index 4aed590e2fc43..595688246a266 100644 --- a/receiver/postgresqlreceiver/scraper.go +++ b/receiver/postgresqlreceiver/scraper.go @@ -243,6 +243,8 @@ func (p *postgreSQLScraper) collectQuerySamples(ctx context.Context, dbClient cl atts[dbAttributePrefix+querySampleColumnWaitEventType].(string), atts[dbAttributePrefix+querySampleColumnQueryID].(string), atts[postgresqlTotalExecTimeAttributeName].(float64), + atts[dbAttributePrefix+querySampleColumnProcedureID].(string), + atts[dbAttributePrefix+querySampleColumnProcedureName].(string), ) } } diff --git a/receiver/postgresqlreceiver/scraper_test.go b/receiver/postgresqlreceiver/scraper_test.go index c54ddc2946857..2a639cf6913ee 100644 --- a/receiver/postgresqlreceiver/scraper_test.go +++ b/receiver/postgresqlreceiver/scraper_test.go @@ -422,6 +422,8 @@ var querySampleColumns = []string{ querySampleColumnState, querySampleColumnQuery, querySampleColumnDurationMilliseconds, + querySampleColumnProcedureID, + querySampleColumnProcedureName, } func newQuerySampleRows(t *testing.T, values map[string]any) *sqlmock.Rows { @@ -474,6 +476,10 @@ func TestScrapeQuerySample(t *testing.T) { querySampleColumnState: "idle", querySampleColumnQuery: "select * from pg_stat_activity where id = 32", querySampleColumnDurationMilliseconds: "1.2", + querySampleColumnProcedureID: "123", + querySampleColumnProcedureName: "imaprocedure", + querySampleColumnWaitEventType: "", + querySampleColumnWaitEvent: "", })) actualLogs, err := scraper.scrapeQuerySamples(t.Context(), 30) assert.NoError(t, err) diff --git a/receiver/postgresqlreceiver/templates/querySampleTemplate.tmpl b/receiver/postgresqlreceiver/templates/querySampleTemplate.tmpl index 931f4646e25de..55fea13b919c3 100644 --- a/receiver/postgresqlreceiver/templates/querySampleTemplate.tmpl +++ b/receiver/postgresqlreceiver/templates/querySampleTemplate.tmpl @@ -1,36 +1,79 @@ -SELECT - COALESCE(datname, '') AS datname, - COALESCE(usename, '') AS usename, - COALESCE(client_addr::TEXT, '') AS client_addr, - COALESCE(client_hostname, '') AS client_hostname, - COALESCE(client_port::TEXT, '') AS client_port, - COALESCE(query_start::TEXT, '') AS query_start, - COALESCE(wait_event_type, '') AS wait_event_type, - COALESCE(wait_event, '') AS wait_event, - COALESCE(query_id::TEXT, '') AS query_id, - COALESCE(pid::TEXT, '') AS pid, - COALESCE(application_name::TEXT, '') AS application_name, - EXTRACT(EPOCH FROM query_start) AS _query_start_timestamp, - state, - query, - CASE - WHEN state = 'active' THEN - EXTRACT(EPOCH FROM (clock_timestamp() - query_start)) * 1e3 - WHEN state IN ('idle','idle in transaction','idle in transaction (aborted)') - AND state_change IS NOT NULL THEN - EXTRACT(EPOCH FROM (state_change - query_start)) * 1e3 - ELSE - NULL - END AS duration_ms -FROM pg_stat_activity -WHERE - coalesce( - TRIM(query), - '' - ) != '' - AND NOT ( -{{/* we only want query that is either still running that was started before our newestQueryTimestamp or idle but start after the newestQueryTimestamp */}} - query_start < TO_TIMESTAMP({{ .newestQueryTimestamp }}) - AND state = 'idle' - ) -LIMIT {{ .limit }}; \ No newline at end of file +WITH base AS ( + SELECT + pid, + datname, + usename, + client_addr::text AS client_addr, + client_hostname, + client_port, + query_start, + wait_event_type, + wait_event, + query_id, + application_name, + state, + query, + EXTRACT(EPOCH FROM query_start) AS _query_start_timestamp, + EXTRACT(EPOCH FROM (clock_timestamp() - query_start)) * 1e3 AS duration_ms, + -- Identify potential CALL statements and normalize the called name + CASE + WHEN UPPER(LTRIM(query)) LIKE 'CALL %' THEN + LOWER( + split_part( + regexp_replace(LTRIM(query), '^CALL\s+', '', 'i'), + '(', + 1 + ) + ) + ELSE NULL + END AS called_proc_name + FROM pg_stat_activity + WHERE COALESCE(TRIM(query), '') <> '' + AND state = 'active' +), + procs AS ( + SELECT + p.oid AS proc_oid, + n.nspname AS proc_schema, + p.proname AS proc_name, + LOWER(p.proname) AS proc_name_lower, + LOWER(n.nspname || '.' || p.proname) AS proc_qualified_lower + FROM pg_proc p + JOIN pg_namespace n ON n.oid = p.pronamespace + -- WHERE p.prokind = 'p' -- procedures only (not functions) + ) +SELECT + COALESCE(b.datname, '') AS datname, + COALESCE(b.usename, '') AS usename, + COALESCE(b.client_addr, '') AS client_addr, + COALESCE(b.client_hostname, '') AS client_hostname, + COALESCE(b.client_port::text, '') AS client_port, + COALESCE(b.query_start::text, '') AS query_start, + COALESCE(b.wait_event_type, '') AS wait_event_type, + COALESCE(b.wait_event, '') AS wait_event, + COALESCE(b.query_id::text, '') AS query_id, + COALESCE(b.pid::text, '') AS pid, + COALESCE(b.application_name, '') AS application_name, + b._query_start_timestamp, + b.duration_ms, + COALESCE(b.state, '') AS state, + COALESCE(b.query, '') AS query, + -- New: procedure id (oid as text, empty string if none) + COALESCE(p.proc_oid::text, '0') AS procedure_id, + -- Existing: schema.name if a procedure is identified + COALESCE( + CASE + WHEN p.proc_schema IS NOT NULL AND p.proc_name IS NOT NULL + THEN p.proc_schema || '.' || p.proc_name + ELSE '' + END, + '' + ) AS procedure_name +FROM base b + LEFT JOIN procs p + ON b.called_proc_name IS NOT NULL + AND ( + b.called_proc_name = p.proc_name_lower + OR b.called_proc_name = p.proc_qualified_lower + ) +LIMIT {{.limit}}; diff --git a/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml b/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml index d090e1794d763..069893dc7c6be 100644 --- a/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml +++ b/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml @@ -48,6 +48,12 @@ resourceLogs: - key: postgresql.total_exec_time value: doubleValue: 1.2 + - key: postgresql.procedure_id + value: + stringValue: "123" + - key: postgresql.procedure_name + value: + stringValue: "imaprocedure" body: {} eventName: db.server.query_sample timeUnixNano: "1758612388939691000" diff --git a/receiver/postgresqlreceiver/testdata/scraper/query-sample/expectedSql.sql b/receiver/postgresqlreceiver/testdata/scraper/query-sample/expectedSql.sql index 088c6ede45739..4e8fff992e078 100644 --- a/receiver/postgresqlreceiver/testdata/scraper/query-sample/expectedSql.sql +++ b/receiver/postgresqlreceiver/testdata/scraper/query-sample/expectedSql.sql @@ -1,37 +1,79 @@ -SELECT - COALESCE(datname, '') AS datname, - COALESCE(usename, '') AS usename, - COALESCE(client_addr::TEXT, '') AS client_addr, - COALESCE(client_hostname, '') AS client_hostname, - COALESCE(client_port::TEXT, '') AS client_port, - COALESCE(query_start::TEXT, '') AS query_start, - COALESCE(wait_event_type, '') AS wait_event_type, - COALESCE(wait_event, '') AS wait_event, - COALESCE(query_id::TEXT, '') AS query_id, - COALESCE(pid::TEXT, '') AS pid, - COALESCE(application_name::TEXT, '') AS application_name, - EXTRACT(EPOCH FROM query_start) AS _query_start_timestamp, - state, - query, - CASE - WHEN state = 'active' THEN - EXTRACT(EPOCH FROM (clock_timestamp() - query_start)) * 1e3 - WHEN state IN ('idle','idle in transaction','idle in transaction (aborted)') - AND state_change IS NOT NULL THEN - EXTRACT(EPOCH FROM (state_change - query_start)) * 1e3 - ELSE - NULL - END AS duration_ms -FROM pg_stat_activity -WHERE - coalesce( - TRIM(query), - '' - ) != '' - AND NOT ( - - query_start < TO_TIMESTAMP(123440.111) - AND state = 'idle' - ) -LIMIT 30; - +WITH base AS ( + SELECT + pid, + datname, + usename, + client_addr::text AS client_addr, + client_hostname, + client_port, + query_start, + wait_event_type, + wait_event, + query_id, + application_name, + state, + query, + EXTRACT(EPOCH FROM query_start) AS _query_start_timestamp, + EXTRACT(EPOCH FROM (clock_timestamp() - query_start)) * 1e3 AS duration_ms, + -- Identify potential CALL statements and normalize the called name + CASE + WHEN UPPER(LTRIM(query)) LIKE 'CALL %' THEN + LOWER( + split_part( + regexp_replace(LTRIM(query), '^CALL\s+', '', 'i'), + '(', + 1 + ) + ) + ELSE NULL + END AS called_proc_name + FROM pg_stat_activity + WHERE COALESCE(TRIM(query), '') <> '' + AND state = 'active' +), + procs AS ( + SELECT + p.oid AS proc_oid, + n.nspname AS proc_schema, + p.proname AS proc_name, + LOWER(p.proname) AS proc_name_lower, + LOWER(n.nspname || '.' || p.proname) AS proc_qualified_lower + FROM pg_proc p + JOIN pg_namespace n ON n.oid = p.pronamespace + -- WHERE p.prokind = 'p' -- procedures only (not functions) + ) +SELECT + COALESCE(b.datname, '') AS datname, + COALESCE(b.usename, '') AS usename, + COALESCE(b.client_addr, '') AS client_addr, + COALESCE(b.client_hostname, '') AS client_hostname, + COALESCE(b.client_port::text, '') AS client_port, + COALESCE(b.query_start::text, '') AS query_start, + COALESCE(b.wait_event_type, '') AS wait_event_type, + COALESCE(b.wait_event, '') AS wait_event, + COALESCE(b.query_id::text, '') AS query_id, + COALESCE(b.pid::text, '') AS pid, + COALESCE(b.application_name, '') AS application_name, + b._query_start_timestamp, + b.duration_ms, + COALESCE(b.state, '') AS state, + COALESCE(b.query, '') AS query, + -- New: procedure id (oid as text, empty string if none) + COALESCE(p.proc_oid::text, '0') AS procedure_id, + -- Existing: schema.name if a procedure is identified + COALESCE( + CASE + WHEN p.proc_schema IS NOT NULL AND p.proc_name IS NOT NULL + THEN p.proc_schema || '.' || p.proc_name + ELSE '' + END, + '' + ) AS procedure_name +FROM base b + LEFT JOIN procs p + ON b.called_proc_name IS NOT NULL + AND ( + b.called_proc_name = p.proc_name_lower + OR b.called_proc_name = p.proc_qualified_lower + ) + LIMIT 30;