Skip to content

[New Filebeat Input] Akamai#48846

Closed
ShourieG wants to merge 29 commits into
elastic:mainfrom
ShourieG:feature/akamai
Closed

[New Filebeat Input] Akamai#48846
ShourieG wants to merge 29 commits into
elastic:mainfrom
ShourieG:feature/akamai

Conversation

@ShourieG
Copy link
Copy Markdown
Contributor

@ShourieG ShourieG commented Feb 13, 2026

Type of change

  • Enhancement
  • Docs

Proposed commit message

x-pack/filebeat/input/akamai: introduce a dedicated Akamai SIEM input with
v2.Input architecture, batch PublishAll, and ACK-based cursor persistence

Add a new dedicated Akamai input to Filebeat for collecting security events
from the Akamai SIEM v1 API. The input implements v2.Input directly (not
inputcursor) with batch event publishing via PublishAll, decoupled cursor
persistence via ACK callbacks, and a chain-based recovery model. NDJSON events
are streamed line-by-line through a bounded channel with zero-copy passthrough.
EdgeGrid HMAC-SHA256 authentication is built into the client. Includes rate
limiting, comprehensive metrics, and structured recovery for offset expiry,
invalid timestamps, and lookback boundary clamping.

Benchmarks at event_limit=60000 show ~11,000 events/sec sustained throughput
with the default configuration (1 worker, batch_size=1000), up from ~7,200
with the previous inputcursor-based approach.

WHAT

  • New dedicated Akamai input under x-pack/filebeat/input/akamai with EdgeGrid HMAC-SHA256 signing built into the client.
  • v2.Input architecture: Implements v2.Input directly (like netflow/awss3) instead of inputcursor.Input. This gives direct access to beat.PipelineConnector for batch publishing and custom ACK handling. No dependency on the inputcursor framework — cursor persistence, pipeline client creation, and ACK tracking are all managed within the input.
  • Batch publishing with PublishAll: Workers accumulate events into configurable batches (batch_size, default 1000) and publish via client.PublishAll(), reducing pipeline lock acquisitions from one-per-event to one-per-batch. With event_limit=60000 and batch_size=1000, that's 60 lock acquisitions per page instead of 60,000.
  • Decoupled ACK-based cursor persistence: The cursor is never written at publish time. Instead, an ACK handler (adapted from the awss3 pattern) tracks published event counts per page and fires a callback when all events are acknowledged by the output. The callback atomically persists the full cursor state (chain_from, chain_to, last_offset, offset_obtained_at) to the statestore. This ensures zero partial cursor writes and exactly one store write per page.
  • Streaming pipeline: NDJSON events are streamed line-by-line through a bounded channel (stream_buffer_size, default workers * 4) to worker goroutines — no full-page buffering. A one-line delay pattern cleanly separates the offset context from events. Raw JSON is passed through as-is; unmarshalling is left to ingest pipelines.
  • Chain-based cursor recovery: Each poll cycle works on a time window (chain_from/chain_to) and drains it via offset pagination. Chains overlap by 10s to prevent boundary gaps. Offsets are proactively dropped when they exceed a configurable TTL, avoiding unnecessary 416 round-trips.
  • Recovery handling:
    • 416 / stale offset → replay the current chain window
    • 400 invalid timestamp → refresh HMAC and retry (configurable attempts), then fall back to chain replay
    • from too old → clamp to 12h lookback and continue
    • Other 400s → non-recoverable for the cycle
    • max_recovery_attempts cap (default 3) prevents infinite loops
  • Rate limiting via golang.org/x/time/rate, comprehensive metrics (counters, histograms, worker utilization), and structured error logging with full cursor context.
  • Test coverage: config validation, EdgeGrid signing, metrics, batch-mode publishing, and table-driven E2E scenarios using a shared mock server (pagination, recovery paths, context cancellation).

WHY

The CEL-based Akamai integration was hitting reliability issues in production — data latency and gaps caused by the SIEM v1 API's short-lived offset tokens (~2min TTL) and tight HMAC timestamp windows. A dedicated input gives us direct control over pagination, recovery, and cursor management.

We chose v2.Input over inputcursor.Input for three reasons:

  1. PublishAll access — inputcursor's Publisher only exposes per-event Publish(event, cursor), which grabs the pipeline lock on every call. With v2.Input we get the beat.Client directly and can batch events through PublishAll, taking the lock once per batch instead of once per event.
  2. Cleaner cursor persistence — inputcursor couples cursor state to every publish call via updateOp, which means cursor writes happen on every ACK batch. With our approach, the cursor is written exactly once per page, atomically, only after all events are confirmed delivered by the output. No partial state ever hits the store.

Benchmark Results

All benchmarks ran against the same Akamai SIEM proxy (proteus-akamai-94a79776.sit.estc.dev) with event_limit=60000, file output, and manual Ctrl+C termination. Each page returns exactly 60,000 events.

Full Comparison (7 runs, sorted by sustained throughput)

Config Arch Workers Batch Queue Pages Events Acked Active (s) evt/s Avg Page (s) Min Page (s) Max Page (s) Fast <5s Slow >10s CPU % RSS (MB) Lock/Page Cursor Writes/Page
1W/1k/10kQ v2 1 1,000 10k 22 1,352,026 123 10,992 3.11 2.83 6.62 95% 0% 34.9% 103 60 1
1W/2k/16kQ v2 1 2,000 16k 41 2,460,000 231 10,649 3.23 2.83 7.53 93% 0% 39.9% 112 30 1
10W/2k/16kQ v2 10 2,000 16k 32 1,920,000 199 9,648 3.97 3.07 12.48 84% 6% 35.7% 168 30 1
20W/2k/10kQ v2 20 2,000 10k 16 960,000 115 8,348 4.61 3.38 11.23 75% 6% 28.4% 217 30 1
20W/5k/10kQ v2 20 5,000 10k 18 1,037,854 148 7,297 6.21 4.46 14.24 61% 11% 27.4% 292 12 1
10W/1k/10kQ v2 10 1,000 10k 16 983,908 137 7,189 6.13 2.99 11.60 50% 19% 26.0% 132 60 1
10W/none (baseline) inputcursor 10 1 16k 21 1,260,000 176 7,159 6.28 2.90 15.08 67% 29% 33.3% 116 60,000 ~60

What we observed

  1. The v2.Input + PublishAll architecture is a clear improvement over inputcursor. Every v2 run outperformed or matched the inputcursor baseline. The best v2 configuration hit ~11,000 evt/s vs ~7,200 for the baseline — roughly a 49% gain. This comes from two things: batch publishing reduces pipeline lock acquisitions dramatically (60 per page vs 60,000), and the decoupled cursor model eliminates per-event updateOp overhead entirely.

  2. Cursor persistence is much cleaner. The inputcursor approach wrote cursor state ~60 times per page (once per ACK batch). The v2 approach writes exactly once per page, atomically, after all events are confirmed delivered. No partial cursor state is ever persisted, which simplifies recovery semantics significantly.

  3. At event_limit=60000, single-worker configurations performed best. The 1-worker runs consistently topped the throughput chart and had the most stable page durations (93-95% of pages under 5s, zero pages over 10s). Multi-worker runs showed lower throughput and more variability. This makes sense at 60k — the per-event work is minimal (string copy + beat.Event construction), so the overhead of goroutine scheduling and concurrent queue access outweighs any parallelism benefit.

  4. That said, worker scaling matters at higher event limits. The API supports event_limit up to 600,000. At 600k events per page, a single worker would need to stream and publish 10x more events sequentially, and the output could become the bottleneck — especially with Elasticsearch or Kafka where each PublishAll blocks on network I/O. We expect multi-worker configurations to become valuable at higher event limits and with latency-sensitive outputs. This needs further testing.

  5. batch_size between 1000-2000 is the sweet spot for these runs. Going beyond 2000 didn't help — the 5k batch run was actually slower due to memory overhead and the time spent accumulating events before each publish. Below 1000 (per-event publish) is where the old inputcursor approach lived, and that's clearly suboptimal.

  6. Memory scales with workers, not with events. RSS ranged from 103 MB (1 worker) to 292 MB (20 workers, 5k batch). The actual working set (memory_alloc) was comparable across all runs (~15-21 MB). The RSS delta is mostly Go runtime address space reservation for goroutine stacks and batch buffers.

  7. Queue sizing is secondary. We saw no meaningful throughput difference between 10k and 16k queue capacities. The queue and output flush settings are fine-tuning knobs, not performance drivers.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

  • This introduces a new input type (akamai) and does not change behavior for existing inputs.

Author's Checklist

  • Core polling/recovery logic validated through mock-server scenarios
  • Metrics behavior validated through dedicated tests
  • Error and recovery branches exercised (offset expiry, invalid timestamp, non-recoverable 400)
  • Batch publishing validated with multiple batch_size and worker configurations
  • ACK-based cursor persistence verified — exactly one atomic write per page, zero partial state
  • No panics — all type assertions removed, all error paths return errors
  • Benchmarked with Akamai proxy at 60k events/page across 7 configurations
  • Documentation updated with new config options and performance tuning guidance

How to test this PR locally

  1. Run full Akamai input tests:
    • go test ./x-pack/filebeat/input/akamai -count=1
  2. Run input flow scenarios only:
    • go test ./x-pack/filebeat/input/akamai -run TestInput -count=1
  3. Run batch publishing tests:
    • go test ./x-pack/filebeat/input/akamai -run TestProcessPageBatchPublish -count=1
  4. Run focused config + metrics tests:
    • go test ./x-pack/filebeat/input/akamai -run TestConfigValidation -count=1
    • go test ./x-pack/filebeat/input/akamai -run TestInputMetrics -count=1

Related issues

Use cases

Screenshots

Logs

Enforce a single EdgeGrid auth path and align Akamai polling behavior around offset expiry, invalid timestamp retries, and page-level cursor checkpointing with stronger runtime metrics/logging.
… tests

Reorganize tests by concern and add a table-driven mock-server harness that validates pagination, recovery, error handling, and partial publish behavior end-to-end.
Isolate workspace ignore updates from Akamai input functional changes to keep review history clean.
@botelastic botelastic Bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 13, 2026
@ShourieG ShourieG self-assigned this Feb 13, 2026
@github-actions
Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@ShourieG ShourieG requested a review from a team February 13, 2026 13:02
@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented Feb 13, 2026

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @ShourieG? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@ShourieG ShourieG added the Team:Security-Service Integrations Security Service Integrations Team label Feb 13, 2026
@botelastic botelastic Bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 13, 2026
@ShourieG ShourieG added Filebeat Filebeat needs_team Indicates that the issue/PR needs a Team:* label labels Feb 13, 2026
@botelastic botelastic Bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 13, 2026
@ShourieG ShourieG added enhancement needs_team Indicates that the issue/PR needs a Team:* label labels Feb 13, 2026
@botelastic botelastic Bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 13, 2026
@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label new input (filebeat) A new input for file beat labels Feb 13, 2026
@botelastic botelastic Bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 13, 2026
ShourieG and others added 3 commits February 17, 2026 18:36
Add full input documentation following existing x-pack filebeat
conventions (config options, recovery behavior, metrics table, tracer
rotation, common options). Link the new page from the input type index.
Mark the input as beta in code, lower default recovery_interval to 1h,
and remove the superseded doc.go.

Co-authored-by: Cursor <cursoragent@cursor.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 17, 2026

Vale Linting Results

Summary: 1 warning, 9 suggestions found

⚠️ Warnings (1)
File Line Rule Message
docs/reference/filebeat/filebeat-input-akamai.md 26 Elastic.Latinisms Latin terms and abbreviations are a common source of confusion. Use 'using' instead of 'via'.
💡 Suggestions (9)
File Line Rule Message
docs/reference/filebeat/filebeat-input-akamai.md 116 Elastic.Semicolons Use semicolons judiciously.
docs/reference/filebeat/filebeat-input-akamai.md 135 Elastic.WordChoice Consider using 'deactivate, deselect, hide, turn off' instead of 'disable', unless the term is in the UI.
docs/reference/filebeat/filebeat-input-akamai.md 145 Elastic.WordChoice Consider using 'deactivate, deselect, hide, turn off' instead of 'disable', unless the term is in the UI.
docs/reference/filebeat/filebeat-input-akamai.md 150 Elastic.WordChoice Consider using 'deactivate, deselect, hide, turn off' instead of 'disable', unless the term is in the UI.
docs/reference/filebeat/filebeat-input-akamai.md 299 Elastic.WordChoice Consider using 'deactivate, deselect, hide, turn off' instead of 'disable', unless the term is in the UI.
docs/reference/filebeat/filebeat-input-akamai.md 304 Elastic.WordChoice Consider using 'efficient' instead of 'easy', unless the term is in the UI.
docs/reference/filebeat/filebeat-input-akamai.md 363 Elastic.Semicolons Use semicolons judiciously.
docs/reference/filebeat/filebeat-input-akamai.md 368 Elastic.WordChoice Consider using 'deactivate, deselect, hide, turn off' instead of 'disable', unless the term is in the UI.
docs/reference/filebeat/filebeat-input-akamai.md 370 Elastic.WordChoice Consider using 'deactivate, deselect, hide, turn off' instead of 'disable', unless the term is in the UI.

The Vale linter checks documentation changes against the Elastic Docs style guide.

To use Vale locally or report issues, refer to Elastic style guide for Vale.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 17, 2026

@ShourieG ShourieG added the docs label Feb 17, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
x-pack/filebeat/input/akamai/input.go (2)

254-257: ⚠️ Potential issue | 🔴 Critical

Zero-event pages never mark chain as drained.

When eventCount == 0, the loop breaks without setting p.cursor.CaughtUp = true. The next poll interval will replay the same chain_from/chain_to window indefinitely since the chain never advances.

Proposed fix
 		if eventCount == 0 {
 			p.log.Debug("no events received, poll cycle complete")
+			p.cursor.CaughtUp = true
 			break
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/filebeat/input/akamai/input.go` around lines 254 - 257, When
eventCount == 0 inside the poll loop, set the cursor's caught-up marker before
breaking so the chain is marked drained; specifically, in the block checking
eventCount (the branch in which you currently call p.log.Debug("no events
received, poll cycle complete") and break), add p.cursor.CaughtUp = true (or
call the appropriate method on p.cursor that marks it caught up) so that
p.cursor.CaughtUp is true prior to break and the chain_from/chain_to window will
advance on the next poll. Ensure you reference p.cursor.CaughtUp (or the
cursor's mark-as-drained method) in the same scope as the eventCount check.

572-586: ⚠️ Potential issue | 🔴 Critical

Persisted cursor omits CaughtUp flag.

fullCursor is built without the CaughtUp field. After restart, the loaded cursor always has CaughtUp=false, causing a drained chain to be treated as in-progress. This forces unnecessary chain replays on every restart.

Proposed fix
 	if totalPublished > 0 {
 		fullCursor := cursor{
 			ChainFrom:        p.cursor.ChainFrom,
 			ChainTo:          p.cursor.ChainTo,
+			CaughtUp:         eventCount < p.cfg.EventLimit,
 			LastOffset:       pageCtx.Offset,
 			OffsetObtainedAt: time.Now(),
 		}

Note: eventCount needs to be passed to or computed in this scope, or use p.cursor.CaughtUp which is set at line 275.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/filebeat/input/akamai/input.go` around lines 572 - 586, The persisted
cursor (`fullCursor`) is missing the CaughtUp field so restarts load
CaughtUp=false and replay drained chains; update the construction of fullCursor
inside the totalPublished > 0 block to include the CaughtUp value (either
pass/compute eventCount here and set CaughtUp = eventCount == 0, or copy
p.cursor.CaughtUp) before calling p.cursorStore.Save; make this change where
fullCursor is created and ensure the value persists through the p.acks.Add
callback so Save stores the correct CaughtUp state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@x-pack/filebeat/input/akamai/config.go`:
- Around line 231-297: After unpacking the YAML into a config in
InputManager.Create(), call config.Validate() and return any validation error
before proceeding; specifically modify InputManager.Create() to invoke the
config.Validate() method (the Validate function on type config) immediately
after unmarshalling/decoding and before using the config or registering the
input so invalid configs cannot reach Run(); ensure the Create() function
propagates the returned error to the caller instead of continuing on success.

---

Duplicate comments:
In `@x-pack/filebeat/input/akamai/input.go`:
- Around line 254-257: When eventCount == 0 inside the poll loop, set the
cursor's caught-up marker before breaking so the chain is marked drained;
specifically, in the block checking eventCount (the branch in which you
currently call p.log.Debug("no events received, poll cycle complete") and
break), add p.cursor.CaughtUp = true (or call the appropriate method on p.cursor
that marks it caught up) so that p.cursor.CaughtUp is true prior to break and
the chain_from/chain_to window will advance on the next poll. Ensure you
reference p.cursor.CaughtUp (or the cursor's mark-as-drained method) in the same
scope as the eventCount check.
- Around line 572-586: The persisted cursor (`fullCursor`) is missing the
CaughtUp field so restarts load CaughtUp=false and replay drained chains; update
the construction of fullCursor inside the totalPublished > 0 block to include
the CaughtUp value (either pass/compute eventCount here and set CaughtUp =
eventCount == 0, or copy p.cursor.CaughtUp) before calling p.cursorStore.Save;
make this change where fullCursor is created and ensure the value persists
through the p.acks.Add callback so Save stores the correct CaughtUp state.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d0efe69d-2b68-4dd4-8b2a-c016ee1222df

📥 Commits

Reviewing files that changed from the base of the PR and between fef0043 and 19d1aa3.

📒 Files selected for processing (4)
  • docs/reference/filebeat/filebeat-input-akamai.md
  • x-pack/filebeat/input/akamai/config.go
  • x-pack/filebeat/input/akamai/config_test.go
  • x-pack/filebeat/input/akamai/input.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • x-pack/filebeat/input/akamai/config_test.go

Comment thread x-pack/filebeat/input/akamai/config.go
@ShourieG
Copy link
Copy Markdown
Contributor Author

@andrewkroh, @efd6, @belimawr All comments have been resolved and AI review feedback also addressed.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@x-pack/filebeat/input/akamai/input.go`:
- Around line 254-257: When eventCount == 0 the code sets p.cursor.CaughtUp
in-memory but never persists it because totalPublished == 0 prevents the normal
ACK path; update the branch that sets p.cursor.CaughtUp to also call the
cursor-persistence routine so the caught-up state is written out (i.e., after
p.cursor.CaughtUp = true invoke your existing cursor save/persist method—e.g.,
p.saveCursor/p.persistCursor/p.storeCursor—or the function that writes p.cursor
to disk/state). Apply the same change to the other code path that updates the
ChainTo/CaughtUp state so the drained/caught-up flag is persisted even when
totalPublished is zero.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dc512d0a-0da0-47b0-8df7-b9af091698ec

📥 Commits

Reviewing files that changed from the base of the PR and between 19d1aa3 and 9a9cd5d.

📒 Files selected for processing (1)
  • x-pack/filebeat/input/akamai/input.go

Comment thread x-pack/filebeat/input/akamai/input.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@x-pack/filebeat/input/akamai/input.go`:
- Line 78: The deferred cs.Close() can run while asynchronous ACK callbacks
(spawned via go result.ackCallback() in x-pack/filebeat/input/akamai/acks.go)
are still executing, risking failed cursor writes; modify the ACK handling so
shutdown waits for all callbacks before closing the cursorStore: either run
callbacks synchronously instead of launching goroutines in the ACK handler or
add a synchronization barrier (e.g., a WaitGroup stored on the ACK manager) that
increments when scheduling result.ackCallback() and is awaited during shutdown
before calling cs.Close(), ensuring all callbacks complete before
cursorStore.Close().

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2bd8c219-348b-4d42-8a70-1d7e4fc49fd6

📥 Commits

Reviewing files that changed from the base of the PR and between 36d3745 and aa1dee0.

📒 Files selected for processing (2)
  • x-pack/filebeat/input/akamai/input.go
  • x-pack/filebeat/input/akamai/input_test.go

Comment thread x-pack/filebeat/input/akamai/input.go
Copy link
Copy Markdown
Member

@belimawr belimawr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at a few files, mostly input.go, store.go and acks.go, focusing on how the input interacts with the pipeline and the store.

Overall LGTM.

I was looking at the test coverage and akamaiInput.Run is not tested at all. That concerns me because this is the part of the code that brings it all together: the pipeline, the store, the client (reading from Akamai), etc.

For other input V2 we usually mock the pipeline and the store for testing and also have some integration tests that run the whole Filebeat.

To test the inputs we have some mocks for the pipeline, store and a bit of extra code to start stop the input, you can look at https://github.com/elastic/beats/blob/main/filebeat/input/journald/environment_test.go as an example.

Another option would be an integration test that runs Filebeat and uses a mock for the Akamai API (I'm not sure how easy it would be to mock it). There are plenty of examples at https://github.com/elastic/beats/tree/main/filebeat/tests/integration, one simple one is

func TestFilestreamCleanInactive(t *testing.T) {
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)
tempDir := filebeat.TempDir()
// 1. Generate the log file path, but do not write data to it
logFilePath := filepath.Join(tempDir, "log.log")
msgLogFilePath := logFilePath
if runtime.GOOS == "windows" {
msgLogFilePath = strings.ReplaceAll(logFilePath, `\`, `\\`)
}
// 2. Write configuration file and start Filebeat
filebeat.WriteConfigFile(fmt.Sprintf(filestreamCleanInactiveCfg, logFilePath, tempDir))
filebeat.Start()
// 3. Create the log file
integration.WriteLogFile(t, logFilePath, 10, false)
// 4. Wait for Filebeat to start scanning for files
filebeat.WaitLogsContains(
fmt.Sprintf("A new file %s has been found", msgLogFilePath),
10*time.Second,
"Filebeat did not start looking for files to ingest")
filebeat.WaitLogsContains(
fmt.Sprintf("File is inactive. Closing. Path='%s'", msgLogFilePath),
10*time.Second,
"File did not became inactive")
filebeat.WaitLogsContains(
fmt.Sprintf("Closed reader. Path='%s'", msgLogFilePath),
10*time.Second, "Filebeat did not close the file")
// 5. Now that the reader has been closed, nothing is holding the state
// of the file, so once the TTL of its state expires and the store GC runs,
// it will be removed from the registry.
// Wait for the log message stating 1 entry has been removed from the registry
filebeat.WaitLogsContains("1 entries removed", 20*time.Second, "entry was not removed from registry")
// 6. Then assess it has been removed in the registry
registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json")
filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second)
}

@ShourieG
Copy link
Copy Markdown
Contributor Author

I looked at a few files, mostly input.go, store.go and acks.go, focusing on how the input interacts with the pipeline and the store.

Overall LGTM.

I was looking at the test coverage and akamaiInput.Run is not tested at all. That concerns me because this is the part of the code that brings it all together: the pipeline, the store, the client (reading from Akamai), etc.

For other input V2 we usually mock the pipeline and the store for testing and also have some integration tests that run the whole Filebeat.

To test the inputs we have some mocks for the pipeline, store and a bit of extra code to start stop the input, you can look at https://github.com/elastic/beats/blob/main/filebeat/input/journald/environment_test.go as an example.

Another option would be an integration test that runs Filebeat and uses a mock for the Akamai API (I'm not sure how easy it would be to mock it). There are plenty of examples at https://github.com/elastic/beats/tree/main/filebeat/tests/integration, one simple one is

func TestFilestreamCleanInactive(t *testing.T) {
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)
tempDir := filebeat.TempDir()
// 1. Generate the log file path, but do not write data to it
logFilePath := filepath.Join(tempDir, "log.log")
msgLogFilePath := logFilePath
if runtime.GOOS == "windows" {
msgLogFilePath = strings.ReplaceAll(logFilePath, `\`, `\\`)
}
// 2. Write configuration file and start Filebeat
filebeat.WriteConfigFile(fmt.Sprintf(filestreamCleanInactiveCfg, logFilePath, tempDir))
filebeat.Start()
// 3. Create the log file
integration.WriteLogFile(t, logFilePath, 10, false)
// 4. Wait for Filebeat to start scanning for files
filebeat.WaitLogsContains(
fmt.Sprintf("A new file %s has been found", msgLogFilePath),
10*time.Second,
"Filebeat did not start looking for files to ingest")
filebeat.WaitLogsContains(
fmt.Sprintf("File is inactive. Closing. Path='%s'", msgLogFilePath),
10*time.Second,
"File did not became inactive")
filebeat.WaitLogsContains(
fmt.Sprintf("Closed reader. Path='%s'", msgLogFilePath),
10*time.Second, "Filebeat did not close the file")
// 5. Now that the reader has been closed, nothing is holding the state
// of the file, so once the TTL of its state expires and the store GC runs,
// it will be removed from the registry.
// Wait for the log message stating 1 entry has been removed from the registry
filebeat.WaitLogsContains("1 entries removed", 20*time.Second, "entry was not removed from registry")
// 6. Then assess it has been removed in the registry
registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json")
filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second)
}

@belimawr Added 'AkamaiRun' tests with store and other required pipeline mocks.

@ShourieG ShourieG requested a review from andrewkroh March 24, 2026 13:09
@ShourieG
Copy link
Copy Markdown
Contributor Author

@efd6, @andrewkroh, @belimawr, @rdner, @theletterf - We are pausing this PR atm and looking into making this into a native otel receiver.

@andrewkroh
Copy link
Copy Markdown
Member

Instead of adding an Akamai SIEM input into Filebeat, this is becoming an OpenTelemetry receiver that will be integrated into EDOT.

elastic/opentelemetry-collector-components#1119

@andrewkroh andrewkroh closed this May 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs enhancement Filebeat Filebeat new input (filebeat) A new input for file beat Team:Docs Label for the Observability docs team Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team Team:Security-Service Integrations Security Service Integrations Team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants