Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c5d313a
Integrating stored procedure values for samples
cjksplunk Dec 2, 2025
4609008
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 2, 2025
0a05a2b
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 2, 2025
a0482e2
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 2, 2025
2d521f1
Merge branch 'main' into postgres-stored-procedure-stats
cjksplunk Dec 2, 2025
64358a9
added changelog
cjksplunk Dec 2, 2025
eb14843
added changelog
cjksplunk Dec 2, 2025
0af7c10
Updates to testing
cjksplunk Dec 3, 2025
6b062bf
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 3, 2025
bfa4ff4
Updated samples query
cjksplunk Dec 3, 2025
4269926
Removed state='active' from query to satisfy existing tests
cjksplunk Dec 3, 2025
8c8092d
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 4, 2025
4d360d9
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 8, 2025
961bdcb
updated the regex syntax
cjksplunk Dec 9, 2025
2474df3
Updated expected SQL for samples
cjksplunk Dec 9, 2025
9500141
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 15, 2025
e7b5e29
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 16, 2025
bc9ecd9
Updated to capture only 'active' samples
cjksplunk Dec 16, 2025
533867b
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 16, 2025
74ffa21
Updated test per linter
cjksplunk Dec 16, 2025
85bd9d2
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Dec 17, 2025
84efc9c
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Jan 5, 2026
593e488
Merged changes from HEAD
cjksplunk Jan 5, 2026
9b4d3ed
Merge branch 'upstream-main' into postgres-stored-procedure-stats
cjksplunk Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .chloggen/postgres-stored-procedure-measurements.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/postgresql

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for collecting stored procedure execution statistics in PostgreSQL receiver"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [0]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |

- Introduced new configuration options to enable stored procedure metrics collection.
- Implemented SQL queries to gather execution statistics from PostgreSQL system catalogs.
- Added unit and integration tests to validate the new functionality.
- Merged changes from HEAD

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,8 @@ func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, new
currentAttributes[string(semconv.DBNamespaceKey)] = row[querySampleColumnDatname]
currentAttributes[string(semconv.UserNameKey)] = row[querySampleColumnUsename]
currentAttributes[postgresqlTotalExecTimeAttributeName] = duration
currentAttributes[dbAttributePrefix+querySampleColumnProcedureID] = row[querySampleColumnProcedureID]
currentAttributes[dbAttributePrefix+querySampleColumnProcedureName] = row[querySampleColumnProcedureName]
finalAttributes = append(finalAttributes, currentAttributes)
}

Expand Down
2 changes: 2 additions & 0 deletions receiver/postgresqlreceiver/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
querySampleColumnUsename = "usename"
querySampleColumnWaitEvent = "wait_event"
querySampleColumnWaitEventType = "wait_event_type"
querySampleColumnProcedureID = "procedure_id"
querySampleColumnProcedureName = "procedure_name"
)

const (
Expand Down
2 changes: 2 additions & 0 deletions receiver/postgresqlreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ query sample
| postgresql.wait_event_type | The type of event for which the backend is waiting, if any; otherwise NULL. | Any Str |
| postgresql.query_id | Identifier of this backend's most recent query. If state is active this field shows the identifier of the currently executing query. In all other states, it shows the identifier of last query that was executed. | Any Str |
| postgresql.total_exec_time | Total time spent executing the statement, in delta milliseconds. | Any Double |
| postgresql.procedure_id | Identifier of the stored procedure. | Any Str |
| postgresql.procedure_name | Name of the stored procedure. | Any Str |

### db.server.top_query

Expand Down
23 changes: 18 additions & 5 deletions receiver/postgresqlreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -188,19 +189,16 @@ func TestScrapeLogsFromContainer(t *testing.T) {
WithOccurrence(1),
},
})
defer testcontainers.CleanupContainer(t, ci)
assert.NoError(t, err)

err = ci.Start(t.Context())
assert.NoError(t, err)
defer testcontainers.CleanupContainer(t, ci)
p, err := ci.MappedPort(t.Context(), postgresqlPort)
assert.NoError(t, err)
connStr := fmt.Sprintf("postgres://root:otel@localhost:%s/otel2?sslmode=disable", p.Port())
db, err := sql.Open("postgres", connStr)
assert.NoError(t, err)

_, err = db.Exec("Select * from test2 where id = 67")
assert.NoError(t, err)
defer db.Close()

cfg := Config{
Expand All @@ -225,6 +223,11 @@ func TestScrapeLogsFromContainer(t *testing.T) {
Logger: zap.Must(zap.NewProduction()),
},
}, &cfg, clientFactory, newCache(1), newTTLCache[string](1000, time.Second))
// run an example query in a separate thread so that it is picked up by the log scraper
// Note that it is ASSUMED that the scraper will run within 1 second of this query being run, but it is not
// guaranteed, so adjustments may be appropriate later
waitGroup := sync.WaitGroup{}
go testQuery(t, db, "Select * from test2 where id = 67; select pg_sleep(1);", &waitGroup)
plogs, err := ns.scrapeQuerySamples(t.Context(), 30)
assert.NoError(t, err)
logRecords := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
Expand All @@ -237,7 +240,7 @@ func TestScrapeLogsFromContainer(t *testing.T) {
if !strings.HasPrefix(query, "select * from test2") {
continue
}
assert.Equal(t, "select * from test2 where id = ?", query)
assert.Equal(t, "select * from test2 where id = ? select pg_sleep ( ? )", query)
databaseAttribute, ok := attributes["db.namespace"]
assert.True(t, ok)
assert.Equal(t, "otel2", databaseAttribute.(string))
Expand Down Expand Up @@ -294,5 +297,15 @@ func TestScrapeLogsFromContainer(t *testing.T) {
assert.Equal(t, int64(2), calls.(int64))
found = true
}

assert.True(t, found, "Expected to find a log record with the query text from the first time top query")
// wait for the query goroutine to finish
waitGroup.Wait()
}

func testQuery(t *testing.T, db *sql.DB, query string, waitGroup *sync.WaitGroup) {
waitGroup.Add(1)
_, err := db.Exec(query)
assert.NoError(t, err)
waitGroup.Done()
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions receiver/postgresqlreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ attributes:
postgresql.pid:
description: Process ID of this backend.
type: int
postgresql.procedure_id:
description: Identifier of the stored procedure.
type: string
postgresql.procedure_name:
description: Name of the stored procedure.
type: string
postgresql.query_id:
description: Identifier of this backend's most recent query. If state is active this field shows the identifier of the currently executing query. In all other states, it shows the identifier of last query that was executed.
type: string
Expand Down Expand Up @@ -200,6 +206,8 @@ events:
- postgresql.wait_event_type
- postgresql.query_id
- postgresql.total_exec_time
- postgresql.procedure_id
- postgresql.procedure_name
db.server.top_query:
enabled: true
description: top query
Expand Down
2 changes: 2 additions & 0 deletions receiver/postgresqlreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func (p *postgreSQLScraper) collectQuerySamples(ctx context.Context, dbClient cl
atts[dbAttributePrefix+querySampleColumnWaitEventType].(string),
atts[dbAttributePrefix+querySampleColumnQueryID].(string),
atts[postgresqlTotalExecTimeAttributeName].(float64),
atts[dbAttributePrefix+querySampleColumnProcedureID].(string),
atts[dbAttributePrefix+querySampleColumnProcedureName].(string),
)
}
}
Expand Down
6 changes: 6 additions & 0 deletions receiver/postgresqlreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ var querySampleColumns = []string{
querySampleColumnState,
querySampleColumnQuery,
querySampleColumnDurationMilliseconds,
querySampleColumnProcedureID,
querySampleColumnProcedureName,
}

func newQuerySampleRows(t *testing.T, values map[string]any) *sqlmock.Rows {
Expand Down Expand Up @@ -474,6 +476,10 @@ func TestScrapeQuerySample(t *testing.T) {
querySampleColumnState: "idle",
querySampleColumnQuery: "select * from pg_stat_activity where id = 32",
querySampleColumnDurationMilliseconds: "1.2",
querySampleColumnProcedureID: "123",
querySampleColumnProcedureName: "imaprocedure",
querySampleColumnWaitEventType: "",
querySampleColumnWaitEvent: "",
}))
actualLogs, err := scraper.scrapeQuerySamples(t.Context(), 30)
assert.NoError(t, err)
Expand Down
115 changes: 79 additions & 36 deletions receiver/postgresqlreceiver/templates/querySampleTemplate.tmpl
Original file line number Diff line number Diff line change
@@ -1,36 +1,79 @@
SELECT
COALESCE(datname, '') AS datname,
COALESCE(usename, '') AS usename,
COALESCE(client_addr::TEXT, '') AS client_addr,
COALESCE(client_hostname, '') AS client_hostname,
COALESCE(client_port::TEXT, '') AS client_port,
COALESCE(query_start::TEXT, '') AS query_start,
COALESCE(wait_event_type, '') AS wait_event_type,
COALESCE(wait_event, '') AS wait_event,
COALESCE(query_id::TEXT, '') AS query_id,
COALESCE(pid::TEXT, '') AS pid,
COALESCE(application_name::TEXT, '') AS application_name,
EXTRACT(EPOCH FROM query_start) AS _query_start_timestamp,
state,
query,
CASE
WHEN state = 'active' THEN
EXTRACT(EPOCH FROM (clock_timestamp() - query_start)) * 1e3
WHEN state IN ('idle','idle in transaction','idle in transaction (aborted)')
AND state_change IS NOT NULL THEN
EXTRACT(EPOCH FROM (state_change - query_start)) * 1e3
ELSE
NULL
END AS duration_ms
FROM pg_stat_activity
WHERE
coalesce(
TRIM(query),
''
) != ''
AND NOT (
{{/* we only want query that is either still running that was started before our newestQueryTimestamp or idle but start after the newestQueryTimestamp */}}
query_start < TO_TIMESTAMP({{ .newestQueryTimestamp }})
AND state = 'idle'
)
LIMIT {{ .limit }};
WITH base AS (
SELECT
pid,
datname,
usename,
client_addr::text AS client_addr,
client_hostname,
client_port,
query_start,
wait_event_type,
wait_event,
query_id,
application_name,
state,
query,
EXTRACT(EPOCH FROM query_start) AS _query_start_timestamp,
EXTRACT(EPOCH FROM (clock_timestamp() - query_start)) * 1e3 AS duration_ms,
-- Identify potential CALL statements and normalize the called name
CASE
WHEN UPPER(LTRIM(query)) LIKE 'CALL %' THEN
LOWER(
split_part(
regexp_replace(LTRIM(query), '^CALL\s+', '', 'i'),
'(',
1
)
)
ELSE NULL
END AS called_proc_name
FROM pg_stat_activity
WHERE COALESCE(TRIM(query), '') <> ''
AND state = 'active'
),
procs AS (
SELECT
p.oid AS proc_oid,
n.nspname AS proc_schema,
p.proname AS proc_name,
LOWER(p.proname) AS proc_name_lower,
LOWER(n.nspname || '.' || p.proname) AS proc_qualified_lower
FROM pg_proc p
JOIN pg_namespace n ON n.oid = p.pronamespace
-- WHERE p.prokind = 'p' -- procedures only (not functions)
)
SELECT
COALESCE(b.datname, '') AS datname,
COALESCE(b.usename, '') AS usename,
COALESCE(b.client_addr, '') AS client_addr,
COALESCE(b.client_hostname, '') AS client_hostname,
COALESCE(b.client_port::text, '') AS client_port,
COALESCE(b.query_start::text, '') AS query_start,
COALESCE(b.wait_event_type, '') AS wait_event_type,
COALESCE(b.wait_event, '') AS wait_event,
COALESCE(b.query_id::text, '') AS query_id,
COALESCE(b.pid::text, '') AS pid,
COALESCE(b.application_name, '') AS application_name,
b._query_start_timestamp,
b.duration_ms,
COALESCE(b.state, '') AS state,
COALESCE(b.query, '') AS query,
-- New: procedure id (oid as text, empty string if none)
COALESCE(p.proc_oid::text, '0') AS procedure_id,
-- Existing: schema.name if a procedure is identified
COALESCE(
CASE
WHEN p.proc_schema IS NOT NULL AND p.proc_name IS NOT NULL
THEN p.proc_schema || '.' || p.proc_name
ELSE ''
END,
''
) AS procedure_name
FROM base b
LEFT JOIN procs p
ON b.called_proc_name IS NOT NULL
AND (
b.called_proc_name = p.proc_name_lower
OR b.called_proc_name = p.proc_qualified_lower
)
LIMIT {{.limit}};
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ resourceLogs:
- key: postgresql.total_exec_time
value:
doubleValue: 1.2
- key: postgresql.procedure_id
value:
stringValue: "123"
- key: postgresql.procedure_name
value:
stringValue: "imaprocedure"
body: {}
eventName: db.server.query_sample
timeUnixNano: "1758612388939691000"
Expand Down
Loading
Loading