Skip to content

[Metricbeat][SQL] Add cursor-based incremental data fetching to SQL module#48722

Open
shmsr wants to merge 61 commits intoelastic:mainfrom
shmsr:feat/sql-cursor-incremental-fetch
Open

[Metricbeat][SQL] Add cursor-based incremental data fetching to SQL module#48722
shmsr wants to merge 61 commits intoelastic:mainfrom
shmsr:feat/sql-cursor-incremental-fetch

Conversation

@shmsr
Copy link
Member

@shmsr shmsr commented Feb 6, 2026

Proposed commit message

Add a cursor feature to the SQL query metricset that enables incremental data fetching by tracking the last fetched row value. The implementation introduces a new cursor sub-package under x-pack/metricbeat/module/sql/query/cursor/ with the following architecture:

  • Config (config.go): Cursor configuration struct with validation — supports enabled, column, type (integer/timestamp/date/float/decimal), default, and direction (asc/desc) fields.
  • Manager (cursor.go): Manages cursor lifecycle — initializes from persisted state or default, updates by scanning query results for the max (ascending) or min (descending) value of the cursor column, and persists state after each fetch cycle.
  • Store (store.go): State persistence via libbeat/statestore with the memlog backend (same battle-tested backend used by Filebeat's registry). State is stored at {data.path}/sql-cursor/. State keys are xxhash-based — derived from (moduleID, DSN, query, column) — so no secrets are stored in the key itself.
  • Types (types.go): Type-safe cursor value handling with parsing from both configuration strings and database result values. Supports int, int32, int64, uint, uint32, uint64, float32, float64, []byte, string, and time.Time inputs. The decimal type uses shopspring/decimal for arbitrary precision.
  • Placeholder (placeholder.go): Translates the :cursor placeholder in user queries to driver-specific parameterized syntax ($1 for Postgres, ? for MySQL, @p1 for MSSQL/sqlserver, :1 for Oracle). Queries use db.QueryContext(ctx, query, args...) — fully SQL-injection safe with no string interpolation.

The query.go MetricSet is extended with:

  • Cursor initialization gated behind cursor.enabled: true in config
  • A Close() method implementing mb.Closer for proper statestore resource cleanup
  • A fetchWithCursor() path using the new FetchTableModeWithParams() helper for parameterized queries
  • Concurrent fetch prevention via sync.Mutex.TryLock() (skips cycle if previous still running)
  • At-least-once delivery guarantee: events are emitted to the reporter before cursor state is saved, so a crash after emit but before save results in duplicates on restart, never data loss

The metricbeat/helper/sql/sql.go shared helper gains:

  • FetchTableModeWithParams(): Same as FetchTableMode but accepts args ...interface{} for parameterized queries
  • mssql → sqlserver mapping in SwitchDriverName(): Aligns the SQL module with the standalone MSSQL module which already uses the modern sqlserver driver. The queryDBNames() and dbSelector() functions are updated to match both driver names.

The SQL module currently re-executes the full query on every collection cycle, fetching all rows each time. For append-only data sources (audit logs, event tables, time-series data), this causes:

  1. Increasing query execution time and database load as tables grow
  2. Duplicate event ingestion on every cycle
  3. No ability to "resume where we left off" after metricbeat restart

The cursor feature solves this by maintaining a persistent bookmark (the cursor) that tracks the last fetched row's value in a specified column. Subsequent queries use this value as a filter parameter (e.g., WHERE id > :cursor), fetching only new rows. This is the same pattern used by Filebeat's input.cursor and is a highly requested feature for the SQL module.

Please note that these changes were implemented using Cursor (AI), with systematic guidance and specific constraints.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

None. The cursor feature is entirely opt-in via cursor.enabled: true in the metricset configuration. When cursor is not configured (the default), the existing code path is completely unchanged — no new fields are initialized, no new methods are called.

The mssql → sqlserver driver name mapping in SwitchDriverName() aligns with the standalone MSSQL module's existing behavior. The sqlserver driver is the modern, recommended driver and accepts the same URL-based connection strings. Existing MSSQL users of the SQL module should see no behavioral change.

Author's Checklist

  • All unit tests pass (go test ./x-pack/metricbeat/module/sql/query/cursor/...)
  • SQL module unit tests pass (go test ./x-pack/metricbeat/module/sql/query/...)
  • SQL helper tests pass (go test ./metricbeat/helper/sql/...)
  • MSSQL module tests pass (go test ./x-pack/metricbeat/module/mssql/...)
  • Full x-pack metricbeat builds cleanly (go build ./x-pack/metricbeat/...)
  • go vet clean on all modified packages
  • NilAway nil-safety checks added for cursor code (pre-existing NilAway findings in dsn.go and sql_test.go are unrelated)

How to test this PR locally

Unit tests (no database required)

# Run all cursor package tests
go test ./x-pack/metricbeat/module/sql/query/cursor/... -v -count=1

# Run SQL module query tests (includes DSN parsing)
go test ./x-pack/metricbeat/module/sql/query/... -v -count=1 -short

# Run SQL helper tests
go test ./metricbeat/helper/sql/... -v -count=1

# Run MSSQL module tests (verify no regression)
go test ./x-pack/metricbeat/module/mssql/... -v -count=1 -short

Integration tests (require database containers)

# PostgreSQL
docker run -d --name pg -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres:15
go test ./x-pack/metricbeat/module/sql/query/... -v -count=1 -run "TestCursor.*Postgres" -tags integration

# MySQL
docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=secret -p 3306:3306 mysql:8
go test ./x-pack/metricbeat/module/sql/query/... -v -count=1 -run "TestCursor.*MySQL" -tags integration

# MSSQL
docker run -d --name mssql -e ACCEPT_EULA=Y -e SA_PASSWORD='1234_asdf' -p 1433:1433 mcr.microsoft.com/mssql/server:2022-latest
go test ./x-pack/metricbeat/module/sql/query/... -v -count=1 -run "TestCursor.*MSSQL" -tags integration

Manual end-to-end test

  1. Start a PostgreSQL container
  2. Create a test table with auto-increment ID:
    CREATE TABLE audit_events (id SERIAL PRIMARY KEY, event_type TEXT, created_at TIMESTAMP DEFAULT NOW());
    INSERT INTO audit_events (event_type) VALUES ('login'), ('logout'), ('access');
  3. Configure metricbeat:
    - module: sql
      metricsets: ["query"]
      period: 10s
      hosts: ["postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"]
      driver: "postgres"
      sql_query: "SELECT id, event_type, created_at FROM audit_events WHERE id > :cursor ORDER BY id ASC LIMIT 100"
      sql_response_format: table
      cursor:
        enabled: true
        column: id
        type: integer
        default: "0"
  4. Run metricbeat — first cycle fetches 3 rows
  5. Insert more rows — next cycle fetches only the new rows
  6. Restart metricbeat — it resumes from the persisted cursor value (check data/sql-cursor/)

Use cases

Scenario: Incremental audit log ingestion

Given the SQL module is configured with cursor enabled on an auto-increment ID column
When metricbeat collects data on the first cycle
Then all rows with ID > default (0) are fetched and the cursor is updated to the max ID
When metricbeat collects data on subsequent cycles
Then only rows with ID > last_cursor_value are fetched
When metricbeat is restarted
Then the cursor value is loaded from disk and collection resumes from where it left off

Scenario: Time-based incremental fetch

Given the SQL module is configured with cursor type "timestamp" on a created_at column
When new rows are inserted between collection cycles
Then only the new rows (created_at >= last_cursor_value) are fetched

Scenario: Descending scan for latest-first workloads

Given the cursor direction is set to "desc"
When results are returned
Then the cursor tracks the minimum value instead of maximum

@shmsr shmsr requested review from a team as code owners February 6, 2026 06:13
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 6, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Feb 6, 2026

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Contributor

mergify bot commented Feb 6, 2026

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @shmsr? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@mergify mergify bot assigned shmsr Feb 6, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Feb 6, 2026

Vale Linting Results

Summary: 1 suggestion found

💡 Suggestions (1)
File Line Rule Message
docs/reference/metricbeat/metricbeat-metricset-sql-query.md 344 Elastic.WordChoice Consider using 'refer to (if it's a document), view (if it's a UI element)' instead of 'see', unless the term is in the UI.

The Vale linter checks documentation changes against the Elastic Docs style guide.

To use Vale locally or report issues, refer to Elastic style guide for Vale.

@shmsr shmsr force-pushed the feat/sql-cursor-incremental-fetch branch 2 times, most recently from 3f8e80d to 635398a Compare February 6, 2026 06:23
@shmsr shmsr added the Team:Obs-InfraObs Label for the Observability Infrastructure Monitoring team label Feb 6, 2026
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 6, 2026
@shmsr shmsr requested a review from stefans-elastic February 6, 2026 06:25
…odule

Add a cursor feature to the SQL query metricset that enables incremental
data fetching by tracking the last fetched row value. This is useful for
continuously appended data like audit logs, event tables, or time-series
data where users want to avoid re-fetching already-seen rows.

Key changes:

- New `cursor` sub-package with modular components:
  - `config.go`: Cursor configuration and validation
  - `cursor.go`: Manager for cursor lifecycle (init, update, persist)
  - `store.go`: State persistence via libbeat statestore (memlog backend)
  - `types.go`: Type-safe cursor value handling (integer, timestamp, date,
    float, decimal) with database value parsing
  - `placeholder.go`: Driver-specific query placeholder translation
    (e.g., :cursor -> $1 for Postgres, ? for MySQL, @p1 for MSSQL)
  - `doc.go`: Package documentation

- Modified `query.go` MetricSet:
  - Cursor initialization gated behind `cursor.enabled: true`
  - `Close()` method implementing `mb.Closer` for resource cleanup
  - `fetchWithCursor()` for parameterized query execution
  - Concurrent fetch prevention via `sync.Mutex.TryLock()`
  - At-least-once delivery: events emitted before cursor state is saved

- Modified `metricbeat/helper/sql/sql.go`:
  - Added `FetchTableModeWithParams()` for parameterized queries
  - Added `mssql -> sqlserver` driver name mapping in `SwitchDriverName()`
    (consistent with standalone MSSQL module which already uses sqlserver)

- Updated `queryDBNames()` and `dbSelector()` to accept both `mssql` and
  `sqlserver` driver names

- Fixed Oracle timezone handling in integration tests: set session
  `TIME_ZONE=UTC` to prevent godror driver timezone conversion issues

- Comprehensive test coverage:
  - Unit tests for all cursor sub-package components
  - Integration tests covering PostgreSQL, MySQL, MSSQL, and Oracle
  - Tests for edge cases: NULL values, missing columns, type mismatches,
    state persistence, descending cursors, concurrent fetch prevention

- Documentation updates for both module and metricset level docs

Supported cursor types: integer, timestamp, date, float, decimal
Supported directions: asc (track max, default), desc (track min)
@shmsr shmsr force-pushed the feat/sql-cursor-incremental-fetch branch from 66d98bc to d83f11f Compare February 6, 2026 06:38
shmsr added 2 commits February 6, 2026 12:10
Replace 'e.g.' with 'for example' in doc.go and query.go comments
to comply with Elastic Vale style guidelines (Elastic.Latinisms rule).
- Document cursor incompatibility with sql_queries and
  fetch_from_all_databases (users would get confusing errors otherwise)
- Add MSSQL example using TOP instead of LIMIT
- Add MSSQL driver-specific note (TOP, @p1 placeholder, mssql->sqlserver)
- Expand timestamp type description with accepted formats
- Document cursor state reset behavior on config changes
- Clarify that raw_data.enabled is optional (not required for cursor)
shmsr added 3 commits February 6, 2026 12:45
- errcheck: wrap deferred db.Exec calls to handle return values
  in cursor_integration_test.go (lines 135, 692)
- goimports: fix whitespace alignment in cursor_integration_test.go
  (lines 185, 823/986 trailing space)
- gosec G115: suppress false-positive uint->int64 overflow warning
  in types.go:206 (bounds check exists on line 203)
- Vale Elastic.WordChoice: replace 'may' with 'can'/'might' in docs
  (lines R221, R233, R234, R253)
- Vale Elastic.Semicolons: replace semicolon with period in MSSQL
  driver note (line R268)
- cursor_test.go: fix comment alignment in table-driven test data
- store.go: use Go 1.13+ octal literal 0o600 instead of 0600
@shmsr shmsr requested a review from a team as a code owner February 6, 2026 07:19
@github-actions
Copy link
Contributor

github-actions bot commented Feb 6, 2026

@pierrehilbert pierrehilbert added the Team:Docs Label for the Observability docs team label Feb 6, 2026
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shmsr
Copy link
Member Author

shmsr commented Mar 5, 2026

I also looked briefly at x-pack/metricbeat/module/sql/query/cursor/store.go and the docs. It would be nice to add some information in the docs about which values are used to create the store key, explaining what would cause the cursor to be reset.

It is already there in doc.go (in detail) and in the user-facing docs, we have it in high-level.

I noticed the query and DSN/URI go into the cursor, I could see those changing without changing the database/results of the query, which would cause a re-ingestion of all the data.

Right, but I think that is the right decision. Although, not letting go of the DSN, but I have removed module ID and added cursor direction to the key.

Not letting of go of DSN (or reducing it to "host:port") because:

  • Multi-host configuration (hosts: [host1, host2]): Both MetricSets would share the same cursor — one would overwrite the other's progress, causing silent data loss
  • Same query on different databases (prod_db vs test_db): Cursors would collide, a high ID in test_db could cause prod_db to skip rows
  • etc

cc: @belimawr

Addressed this with a new opt-in feature.

Please see: https://github.com/elastic/obs-integration-team/issues/830#issuecomment-3860018902 (See: "Mitigation for password rotation"); "cursor.state_id" is the new config.

@ishleenk17
Copy link
Member

@claude pls review

@shmsr
Copy link
Member Author

shmsr commented Mar 9, 2026

Reminder for review!

@github-actions
Copy link
Contributor

Analyzed workflow run 22890212377 for this PR: all jobs completed successfully (check job conclusion: success), so there is no failing step to diagnose.

The only notable log line was a post-job cache save warning (Unable to reserve cache ... another job may be creating this cache), which is non-blocking and did not affect check outcome.

Tests/checks run in this workflow: docs/check pipeline (Run check) completed successfully; no follow-up remediation is required for this run.


What is this? | From workflow: PR Actions Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

Copy link
Member

@ishleenk17 ishleenk17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Team:Docs Label for the Observability docs team Team:Obs-InfraObs Label for the Observability Infrastructure Monitoring team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants