[SigEvents] Add KI feature identification endpoints and refactor task to use shared service#263528
Conversation
Extract feature identification logic into a shared service (features_identification_service.ts) with two top-level functions: identifyInferredFeatures and identifyComputedFeatures. Add two internal endpoints (_identify/inferred and _identify/computed) as thin wrappers around the service, designed for use by the upcoming Workflows Management orchestrator. Made-with: Cursor
Replace FeatureAccumulator, identifyStreamFeatures, and duplicated reconciliation logic with calls to identifyInferredFeatures and identifyComputedFeatures from the shared service. The task now mirrors the workflow YAML structure: init state, loop N iterations calling the service, then run computed features sequentially. Made-with: Cursor
- Slim AccumulatedIterationState to discoveredFeatures + iterationResults; derive successCount and totalTokensUsed via helpers - Make runId optional (auto-generated), remove iteration/successCount/ totalTokensUsed from route params - Remove iteration/successCount from computed route (caller responsibility) - Move Zod schemas (tokenCountSchema, iterationResultSchema) to @kbn/streams-schema alongside the TypeScript types - Export MS_PER_DAY from service, remove duplicate in route - Replace EMPTY_ACCUMULATED_STATE const with createEmptyAccumulatedState() factory - Replace _/__ throwaway destructuring with explicit tuning construction - Restore computed features parallelism in task (Promise started before iteration loop) - Handle computedFeaturesPromise rejection on error path - Distinguish logger namespaces: inferred vs computed - Add error logging with context to both route handlers Made-with: Cursor
|
/ci |
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
1 similar comment
✅ Actions performedFull review triggered. |
📝 WalkthroughWalkthroughAdds runtime Zod schemas ( ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
✅ Actions performedFull review triggered. |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository YAML (base), Organization UI (inherited) Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (7)
📝 WalkthroughWalkthroughThis change introduces a feature identification system for SIG Events. It adds Zod validation schemas ( ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Tools execution failed with the following error: Failed to run tools: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) Comment |
Made-with: Cursor
Break the 732-line monolith into cohesive files by responsibility: - iteration_state.ts: accumulated state types and helpers - reconcile_features.ts: inferred/computed feature reconciliation - identify_inferred_features.ts: LLM iteration, telemetry, top-level handler - identify_computed_features.ts: computed features handler - index.ts: barrel re-exports preserving the public API Made-with: Cursor
…inline durationMs - Replace manual IterationResult interface with z.infer<typeof iterationResultSchema> - Track failure telemetry in inferred features route catch block - Compute durationMs inline instead of via thunk Made-with: Cursor
…try iteration count - Drop iterationResult field from IdentifyInferredFeaturesResult (always last element of state.iterationResults) - Fix off-by-one in route failure telemetry: use iterationResults.length + 1 Made-with: Cursor
Made-with: Cursor
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
ApprovabilityVerdict: Needs human review This PR adds two new internal API endpoints for KI feature identification and refactors the background task to use shared services. The new endpoints expose previously task-only functionality via HTTP, representing new capability. Open review comments question the API design approach (leaking internal state to clients). All changed files are owned by teams other than the author. You can customize Macroscope's approvability policy. Learn more. |
✅ Actions performedFull review triggered. |
1 similar comment
✅ Actions performedFull review triggered. |
| discoveredFeatures: z.array(featureSchema).optional().default([]), | ||
| iterationResults: z.array(iterationResultSchema).optional().default([]), |
There was a problem hiding this comment.
To avoid that we'd need to persist that state somehow, any chance the workflow already stores and exposes the results of the previous steps so we can lean on that ?
Otherwise, leaking all that internal state to the client feels off, so I'm wondering if we should instead have a convenience api that does the looping internally.
What would we lose if we call a route that handles the looping with a iterations parameter, instead of a single step ?
There was a problem hiding this comment.
The endpoint is designed to be workflow-agnostic, it shouldn't be coupled to the workflow engine's state management, so I'd prefer not to have it read previous step results from the workflow context.
That said, I'd like to understand what specifically feels off about passing discoveredFeatures and iterationResults back. These are internal endpoints, the caller is always Kibana itself.
What is your concern about?
There was a problem hiding this comment.
My main concern is pushing too much state to the client:
- for
discoveredFeatures, it looks like a gap in storage that we attempt to patch - for
iterationResults, it looks like the server only reads its length (for the iteration counter in telemetry and the response), so perhaps we don't need it at all
Agreed that coupling to Workflow isn't a good choice, but how about storing run_id on the features ? The endpoint could then determine what was discovered in this run without relying on the client, and it gives us an audit trail for cheap.
For iterationResults, we could drop it from the request entirely, or replace the array with an integer if we still want to stamp the iteration number server side
There was a problem hiding this comment.
Good call on the run_id approach, I've implemented it:
- Features now get tagged with
run_idwhen persisted. The endpoint derivesdiscoveredFeaturesby filtering stored features onrun_id, so the caller no longer passes them back. iterationResultsremoved from the request body, replaced with an optionaliteration: number(the only thing the server actually needed was the counter).
The request body is now just tuning parameters + runId + iteration. No internal state roundtrips.
Features are now tagged with run_id when persisted. The inferred identification endpoint derives discoveredFeatures from ES by filtering on run_id instead of requiring them from the caller. iterationResults removed from request body, replaced with an optional iteration number parameter. Made-with: Cursor
| streamName: string; | ||
| featureTtlDays?: number; | ||
| }): Feature[] { | ||
| const metadata = createFeatureMetadata({ featureTtlDays }); |
There was a problem hiding this comment.
Is there any downside to add the run_id to computed features ? I know for now we only keep one so it's not that useful, but if we move to data stream we'll likely keep history of it and a single query with a given run_id would give you the complete snapshot of discovered features for a run
There was a problem hiding this comment.
It makes sense to add the run_id to computed features as well, done here: 144c5af
klacabane
left a comment
There was a problem hiding this comment.
LGTM - tested by manually calling the endpoint. Features from previous iterations of the same run are correctly fed back to next iterations.
Left a comment about persisting the run_id for computed features
Make runId required across all feature reconciliation paths. The computed features route now generates a runId when not provided, matching the inferred route behavior. This enables querying a complete feature snapshot by run_id. Made-with: Cursor
💚 Build Succeeded
Metrics [docs]Module Count
Public APIs missing comments
Async chunks
Public APIs missing exports
History
|
…sationChanges23 * commit '9a7b717c662d1c904052bc59f0e5a81daab87c7f': (145 commits) Upgrade EUI to v114.2.0 (elastic#264550) [Entity Analytics] Add missing OpenAPI descriptions and examples to p… (elastic#264778) [Entity Resolution] Clarify CSV upload result for already-linked entities (elastic#264689) [AI Infra] Fix failing GenAI Settings Scout tests (elastic#260496) [Agent Builder] [Bug Bash] OAuth connector settings mention fields that are not there (elastic#264756) [performance] process-wide cache for advanced settings lookup (elastic#262618) [CI] Update limits.yml for securitySolution (elastic#264946) [SLO] Fix APM embeddable ids (elastic#264750) [EDR Workflows] Unify artifacts empty state buttons (elastic#264389) [Alert Triage workflow] Adds security.buildAlertEntityGraph and security.renderAlertNarrative… (elastic#259159) [SigEvents] Add KI feature identification endpoints and refactor task to use shared service (elastic#263528) [Scout] Migrate Data Views API tests from FTR - Part5 (elastic#264088) [Cases] Apply shared extended_fields path util server side (elastic#264706) [Lens as code] Fix metric trendline (elastic#264777) [api-docs] 2026-04-22 Daily api_docs build (elastic#264882) [Scout] Update test config manifests (elastic#264575) [workflows_management] Lazy-load Zod connector schemas to cut idle memory (elastic#264283) [ES|QL] Fix ES|QL columns reset race during active fetch (elastic#263947) [Content List] Column layout props, sticky actions, and title click handlers (elastic#264203) [Lens as code] Validate `id` in route for new vis types (elastic#264480) ...
… to use shared service (elastic#263528) ## Summary Adds two internal endpoints for KI feature identification and refactors the Task Manager task to use the same shared logic, eliminating code duplication and preparing for the upcoming Workflows Management migration. ### New internal endpoints - **`POST /internal/streams/{name}/features/_identify/inferred`** — Runs one iteration of feature identification: samples documents, runs LLM inference, reconciles results against known/excluded features, persists changes, and emits EBT telemetry. Accepts and returns accumulated state (`discoveredFeatures`, `totalTokensUsed`, `successCount`, `iterationResults`) so a caller can drive the iteration loop externally. - **`POST /internal/streams/{name}/features/_identify/computed`** — Generates computed KI features (e.g. from ES aggregations), reconciles UUIDs/metadata, and persists them. These endpoints are designed for the declarative features identification workflow that will replace the Task Manager task once Workflows Management is available.
Summary
Adds two internal endpoints for KI feature identification and refactors the Task Manager task to use the same shared logic, eliminating code duplication and preparing for the upcoming Workflows Management migration.
New internal endpoints
POST /internal/streams/{name}/features/_identify/inferred— Runs one iteration of feature identification: samples documents, runs LLM inference, reconciles results against known/excluded features, persists changes, and emits EBT telemetry. Accepts and returns accumulated state (discoveredFeatures,totalTokensUsed,successCount,iterationResults) so a caller can drive the iteration loop externally.POST /internal/streams/{name}/features/_identify/computed— Generates computed KI features (e.g. from ES aggregations), reconciles UUIDs/metadata, and persists them.These endpoints are designed for the declarative features identification workflow that will replace the Task Manager task once Workflows Management is available.