Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/metricbeat/module/openai: Add new module #41516

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

shmsr
Copy link
Member

@shmsr shmsr commented Nov 4, 2024

Proposed commit message

Implement a new module for OpenAI usage collection. This module operates on https://api.openai.com/v1/usage (by default; also configurable for Proxy URLs, etc.) and collects the limited set of usage metrics emitted from the undocumented endpoint.

Example how the usage endpoints emits metrics:

Given timestamps t0, t1, t2, ... tn in ascending order:

  • At t0 (first collection):
   usage_metrics_1: *
  • At t1 (after new API usage):
   usage_metrics_1: *
   usage_metrics_2: *
  • At t2 (continuous collection):
   usage_metrics_1: *
   usage_metrics_2: *
   usage_metrics_3: *

and so on.

Example response:

{
  "object": "list",
  "data": [
    {
      "organization_id": "org-xxx",
      "organization_name": "Personal",
      "aggregation_timestamp": 1725389580,
      "n_requests": 1,
      "operation": "completion",
      "snapshot_id": "gpt-4o-mini-2024-07-18",
      "n_context_tokens_total": 62,
      "n_generated_tokens_total": 21,
      "email": null,
      "api_key_id": null,
      "api_key_name": null,
      "api_key_redacted": null,
      "api_key_type": null,
      "project_id": null,
      "project_name": null,
      "request_type": ""
    },
    {
      "organization_id": "org-xxx",
      "organization_name": "Personal",
      "aggregation_timestamp": 1725389640,
      "n_requests": 1,
      "operation": "completion",
      "snapshot_id": "gpt-4o-mini-2024-07-18",
      "n_context_tokens_total": 97,
      "n_generated_tokens_total": 17,
      "email": null,
      "api_key_id": null,
      "api_key_name": null,
      "api_key_redacted": null,
      "api_key_type": null,
      "project_id": null,
      "project_name": null,
      "request_type": ""
    }
  ],
  "tpm_data": [
    {
      "organization_id": "org-xxx",
      "organization_name": "Personal",
      "day_timestamp": 1725321600,
      "snapshot_id": "gpt-4o-mini-2024-07-18",
      "operation": "completion",
      "p90_context_tpm": 97,
      "p90_generated_tpm": 21,
      "p90_provisioned_context_tpm": 0,
      "p90_provisioned_generated_tpm": 0,
      "max_context_tpm": 97,
      "max_generated_tpm": 21,
      "max_provisioned_context_tpm": 0,
      "max_provisioned_generated_tpm": 0
    }
  ],
  "ft_data": [],
  "dalle_api_data": [],
  "whisper_api_data": [],
  "tts_api_data": [],
  "assistant_code_interpreter_data": [],
  "retrieval_storage_data": []
}

As soon as the API is used, usage is generated after a few times. So, if collecting using the module real-time and that too multiple times of the day, it would collect duplicates and it is not good for storage as well as analytics of the usage data.

It's better to collect time.Now() (in UTC) - 24h so that we get full usage collection of the past day (in UTC) and it avoids duplication. So that's why I have introduced a config realtime and set it to false as the collection is 24h delayed; we are now getting daily data. realtime: true will work as any other normal collection where metrics are fetched in set intervals. Our recommendation is to keep realtime: false.

As this is a metricbeat module, we do not have existing package that gives us support to store the cursor. So, in order to avoid pulling already pulled data, timestamps are being stored per API key. Logic for the same is commented in the code on how it is stored. We are using a new custom code to store the state in order to store the cursor and begin from the next available date.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • Check the state store
  • Validate with usage dashboard of OpenAI

How to test this PR locally

  • Run metricbeat and use your OpenAI's API key to collect usage metrics.

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Nov 4, 2024
@mergify mergify bot assigned shmsr Nov 4, 2024
Copy link
Contributor

mergify bot commented Nov 4, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @shmsr? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit

Copy link
Contributor

mergify bot commented Nov 4, 2024

backport-8.x has been added to help with the transition to the new branch 8.x.
If you don't need it please use backport-skip label and remove the backport-8.x label.

@mergify mergify bot added the backport-8.x Automated backport to the 8.x branch with mergify label Nov 4, 2024
@shmsr shmsr added Module: openai Team:Obs-InfraObs Label for the Observability Infrastructure Monitoring team labels Nov 4, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Nov 4, 2024
@shmsr shmsr requested a review from a team November 5, 2024 07:51
@shmsr
Copy link
Member Author

shmsr commented Nov 5, 2024

Getting hit by this error: #41174 (comment) and hence the CI is failing. Rest all okay.

@shmsr
Copy link
Member Author

shmsr commented Nov 6, 2024

To continue with my testing and to avoid: Limit of total fields [10000] has been exceeded error until it is fixed for 8.15.x and older, I am using: setup.template.fields where I specify a new field that only contains the ecs fields and openai fields from fields.yml and nothing else.

See this: https://www.elastic.co/guide/en/beats/metricbeat/current/configuration-template.html

So, this has currently unblocked me but yes we definitely need a fix for this.

@shmsr shmsr marked this pull request as ready for review November 12, 2024 07:28
@shmsr shmsr requested a review from a team as a code owner November 12, 2024 07:28
@pierrehilbert pierrehilbert added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Nov 12, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@shmsr
Copy link
Member Author

shmsr commented Nov 12, 2024

I've explain the complicated collection mechanism in the PR description itself. Rest is self-explanatory from the code. Please let me know, if anything needs further clarification.

x-pack/metricbeat/module/openai/usage/client.go Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/client.go Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/config.go Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/usage.go Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/usage.go Outdated Show resolved Hide resolved
}
],
"ft_data": [],
"dalle_api_data": [],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to have data for each data set

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried generating ft (fine-tuning) data but it doesn't seem to work. As OpenAI provides this API as undocumented, I couldn't find a single source with any samples. Not even sure they even populate for the response of this particular endpoint. For dalle_api_data, I'll add.

Copy link
Member

@AndersonQ AndersonQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot to revert the changes you made to the default config file

x-pack/metricbeat/metricbeat.yml Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/usage.go Outdated Show resolved Hide resolved
return fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("error response from API: %s", resp.Status)
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("error response from API: status=%s, body=%s", resp.Status, string(body))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be careful here to do not leak sensitive information. I usually try to log the body in its own log, without including the body in the error message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying this morning only. I did found one case where the logging the body is not ideal. In case of invalid API keys, the body shows the API key that it's invalid to connect to OpenAI. So, in case there's a typo when typing the API key, most part of the API key is exposed. So, I'll remove this in order to avoid any sensitive info leakage.

Copy link
Member

@AndersonQ AndersonQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I spotted a wee issue on how you create the http client and measure how long the request take, as soon as it's fixed, it's good to go.

Comment on lines 30 to 34
err := c.Ratelimiter.Wait(req.Context())
waitDuration := time.Since(start)
if err != nil {
return nil, err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT|Go convention]

Suggested change
err := c.Ratelimiter.Wait(req.Context())
waitDuration := time.Since(start)
if err != nil {
return nil, err
}
err := c.Ratelimiter.Wait(req.Context())
if err != nil {
return nil, err
}
waitDuration := time.Since(start)

Comment on lines 44 to 45
var client = http.DefaultClient
client.Timeout = timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocker]
You should not change the default cleint, you should create a new one.

var client = http.DefaultClient will make client to reference the http.DefaultClient, so any change in client will take effect for everyone using http.DefaultClient

Suggested change
var client = http.DefaultClient
client.Timeout = timeout
client := http.Client{Timeout: timeout}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good point. Thanks.

@ishleenk17
Copy link
Contributor

@shmsr : Started reviewing the module. Can you please link the document where this approach and the metrics we are collcting was decided ?

@shmsr
Copy link
Member Author

shmsr commented Nov 27, 2024

@shmsr : Started reviewing the module. Can you please link the document where this approach and the metrics we are collcting was decided ?

As it is a private repo, I've linked reversely linked the issue as per recommendation. Also, the research there will show the research on which API to use. And the decision on how to process the metrics (more implementation specific details) is defined here in the description itself: #41516 (comment); @muthu-mps and I decided it on a call.

@shmsr
Copy link
Member Author

shmsr commented Nov 27, 2024

Thanks @AndersonQ. Very helpful review comments; appreciate it! ✨

"api_key_redacted": null,
"api_key_type": null,
"email": null,
"n_cached_context_tokens_total": 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shmsr , @ishleenk17 - Are we okay with the field names? IMO, Updating the field names something similar to
n_context_tokens_total -> tokens.total or tokens_total in the context of usability. But still it is also good to keep as it is to relate the ES fields with the openai API metrics. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should keep fields more readable and inline with other LLM Integrations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to keep it like this here and change the names in ingest pipelines in integrations. What do you think?

x-pack/metricbeat/module/openai/usage/client.go Outdated Show resolved Hide resolved
"time"
)

type Config struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although not necessary but the name Config sounds a little vague, and doesn't clearly explain the purpose. Consider renaming it for clarity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any suggestion? The Config is like that in many modules, so kept it like that only

Search for type Config struct (case-insensitive search) in metricbeat/module/*

x-pack/metricbeat/module/openai/usage/helper.go Outdated Show resolved Hide resolved
if len(parts) != 2 {
continue
}
headersMap[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Extract them separately for clarity.

x-pack/metricbeat/module/openai/usage/persistcache.go Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/schema.go Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/usage.go Outdated Show resolved Hide resolved
"project_name": usage.ProjectName,
"request_type": usage.RequestType,
"n_cached_context_tokens_total": usage.NCachedContextTokensTotal,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this API provide a division of Input and Output tokens?
If yes, we can include that also.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get this. Can you please elaborate?

x-pack/metricbeat/module/openai/usage/client.go Outdated Show resolved Hide resolved
x-pack/metricbeat/module/openai/usage/client.go Outdated Show resolved Hide resolved
}
}
if c.Timeout <= 0 {
errs = append(errs, errors.New("timeout must be greater than 0"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this happen ? Since we are having a default value for it always ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, no. Yes, because of default. Just kept it there as it's a just a one time check and to avoid issues if someday someone changes the default i.e., 0 (or lower).

Copy link
Member Author

@shmsr shmsr Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes also in the case if the user has configured the timeout as 0 or lower.

APIURL: "https://api.openai.com/v1/usage",
Timeout: 30 * time.Second,
RateLimit: &rateLimitConfig{
Limit: ptr(60),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What defines these limits ?
If you could point me to documentation due to which we have chosen these defaults ?

Copy link
Member Author

@shmsr shmsr Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout is a sane default. And the rate-limit is decided as per testing. If you see the meta ticket, you'd find an issue there where we have researched about rate-limits of this API. Not linking it here as it's private. Can share over Slack.

}

// Clear removes all state files by deleting and recreating the state directory
func (s *stateStore) Clear() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If our intent here is to remove the files, since we are again recreating the directory. Should we delete just the files instead?
It might avoid potential permission issues or race conditions. WDYT ?

Eg:

files, err := os.ReadDir(s.Dir)
if err != nil {
    return fmt.Errorf("reading state directory: %w", err)
}

for _, file := range files {
    if err := os.Remove(path.Join(s.Dir, file.Name())); err != nil {
        return fmt.Errorf("removing state file: %w", err)
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, when I was writing this I was creating it to be good package for reuse later as well and this method was inteded to completely reset the state. Plan earlier was to add subdirectories too inside the state folder because what if in future we add a new dataset. For now, we just have "usage", but in case we add more.

But as it just resides in this package, we are not even using this method. It is unused. We can even remove this method.

So, that's why this. It's a complete reset of state.

return nil, errors.New("empty path provided")
}

store, err := newStateStore(storePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if the storepath indeed exists ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are creating the storePath. Part of it already there and the rest we create with MkdirAll.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this: newStateManager(paths.Resolve(paths.Data, path.Join("state", base.Module().Name(), base.Name())))

paths.Data is supplied by Agent using --paths.data flag. So, even if the directory doesn't exist, we create them.

x-pack/metricbeat/module/openai/usage/persistcache.go Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-8.x Automated backport to the 8.x branch with mergify Module: openai Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team Team:Obs-InfraObs Label for the Observability Infrastructure Monitoring team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants