Skip to content

Commit

Permalink
[receiver/azureeventhub] support parsing more time format (open-telem…
Browse files Browse the repository at this point in the history
…etry#36762)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Support parsing more time format by configuration instead of only
iso8601.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes open-telemetry#36650 

<!--Describe what testing was performed and which tests were added.-->
#### Testing

- Added unit test for time parsing function
- Validated locally to send data with time format in the issue and
received successfully

<!--Describe the documentation added.-->
#### Documentation

Add some new configurations, including `time_format` and `time_offset`.

```yaml
receivers
  azureeventhub:
    connection: "xxxxx"
    format: "azure"
    # optional
    time_format:
      # All supported time format. Default is empty string array, which means using the current iso8601 parser. The format is based on https://pkg.go.dev/time#Layout. If no time-zone info, will use UTC time.
      logs: ["01/02/2006 15:04:05","2006-01-02 15:04:05","2006-01-02T15:04:05Z07:00"]
      metrics: [""]
      traces: [""]
    # optional
    time_offset:
      # The offset hours to parsed time. Mainly for cases when there's no time-zone info in time string. default is 0.
      logs: -8
      metrics: +8
      traces: -8
```

<!--Please delete paragraphs that you did not use before submitting.-->

---------

Co-authored-by: Antoine Toulme <[email protected]>
  • Loading branch information
hgaol and atoulme authored Dec 16, 2024
1 parent f8cfe87 commit f72ef07
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 52 deletions.
27 changes: 27 additions & 0 deletions .chloggen/36650.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: azureeventhubreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: support providing one or more time formats for timestamp parsing

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36650]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
32 changes: 21 additions & 11 deletions pkg/translator/azure/resourcelogs_to_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"strconv"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/relvacode/iso8601"
Expand Down Expand Up @@ -71,8 +72,9 @@ type azureLogRecord struct {
var _ plog.Unmarshaler = (*ResourceLogsUnmarshaler)(nil)

type ResourceLogsUnmarshaler struct {
Version string
Logger *zap.Logger
Version string
Logger *zap.Logger
TimeFormats []string
}

func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
Expand Down Expand Up @@ -105,7 +107,7 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {

for i := 0; i < len(logs); i++ {
log := logs[i]
nanos, err := getTimestamp(log)
nanos, err := getTimestamp(log, r.TimeFormats...)
if err != nil {
r.Logger.Warn("Unable to convert timestamp from log", zap.String("timestamp", log.Time))
continue
Expand All @@ -129,11 +131,11 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
return l, nil
}

func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) {
func getTimestamp(record azureLogRecord, formats ...string) (pcommon.Timestamp, error) {
if record.Time != "" {
return asTimestamp(record.Time)
return asTimestamp(record.Time, formats...)
} else if record.Timestamp != "" {
return asTimestamp(record.Timestamp)
return asTimestamp(record.Timestamp, formats...)
}

return 0, errMissingTimestamp
Expand All @@ -142,13 +144,21 @@ func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) {
// asTimestamp will parse an ISO8601 string into an OpenTelemetry
// nanosecond timestamp. If the string cannot be parsed, it will
// return zero and the error.
func asTimestamp(s string) (pcommon.Timestamp, error) {
t, err := iso8601.ParseString(s)
if err != nil {
return 0, err
func asTimestamp(s string, formats ...string) (pcommon.Timestamp, error) {
var err error
var t time.Time
// Try parsing with provided formats first
for _, format := range formats {
if t, err = time.Parse(format, s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
}

return pcommon.Timestamp(t.UnixNano()), nil
// Fallback to ISO 8601 parsing if no format matches
if t, err = iso8601.ParseString(s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
return 0, err
}

// asSeverity converts the Azure log level to equivalent
Expand Down
19 changes: 18 additions & 1 deletion pkg/translator/azure/resourcelogs_to_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,25 @@ func TestAsTimestamp(t *testing.T) {
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05")
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but fallback to iso8601 and succeeded to parse
timestamp = "2022-11-11T04:48:27.6767145Z"
nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05")
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but all failed to parse
timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, "2006-01-02 15:04:05")
assert.Error(t, err)
assert.Equal(t, pcommon.Timestamp(0), nanos)

timestamp = "invalid-time"
nanos, err = asTimestamp(timestamp)
nanos, err = asTimestamp(timestamp, nil...)
assert.Error(t, err)
assert.Equal(t, pcommon.Timestamp(0), nanos)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/translator/azure/resources_to_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ type azureTracesRecord struct {
var _ ptrace.Unmarshaler = (*TracesUnmarshaler)(nil)

type TracesUnmarshaler struct {
Version string
Logger *zap.Logger
Version string
Logger *zap.Logger
TimeFormats []string
}

func (r TracesUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) {
Expand Down Expand Up @@ -95,7 +96,7 @@ func (r TracesUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) {

resource.Attributes().PutStr("service.name", azureTrace.AppRoleName)

nanos, err := asTimestamp(azureTrace.Time)
nanos, err := asTimestamp(azureTrace.Time, r.TimeFormats...)
if err != nil {
r.Logger.Warn("Invalid Timestamp", zap.String("time", azureTrace.Time))
continue
Expand Down
33 changes: 21 additions & 12 deletions pkg/translator/azurelogs/resourcelogs_to_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"strconv"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/relvacode/iso8601"
Expand Down Expand Up @@ -44,7 +45,6 @@ const (

var errMissingTimestamp = errors.New("missing timestamp")

// azureRecords represents an array of Azure log records
// as exported via an Azure Event Hub
type azureRecords struct {
Records []azureLogRecord `json:"records"`
Expand Down Expand Up @@ -76,8 +76,9 @@ type azureLogRecord struct {
var _ plog.Unmarshaler = (*ResourceLogsUnmarshaler)(nil)

type ResourceLogsUnmarshaler struct {
Version string
Logger *zap.Logger
Version string
Logger *zap.Logger
TimeFormats []string
}

func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
Expand Down Expand Up @@ -109,7 +110,7 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {

for i := 0; i < len(logs); i++ {
log := logs[i]
nanos, err := getTimestamp(log)
nanos, err := getTimestamp(log, r.TimeFormats...)
if err != nil {
r.Logger.Warn("Unable to convert timestamp from log", zap.String("timestamp", log.Time))
continue
Expand Down Expand Up @@ -137,11 +138,11 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
return l, nil
}

func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) {
func getTimestamp(record azureLogRecord, formats ...string) (pcommon.Timestamp, error) {
if record.Time != "" {
return asTimestamp(record.Time)
return asTimestamp(record.Time, formats...)
} else if record.Timestamp != "" {
return asTimestamp(record.Timestamp)
return asTimestamp(record.Timestamp, formats...)
}

return 0, errMissingTimestamp
Expand All @@ -150,13 +151,21 @@ func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) {
// asTimestamp will parse an ISO8601 string into an OpenTelemetry
// nanosecond timestamp. If the string cannot be parsed, it will
// return zero and the error.
func asTimestamp(s string) (pcommon.Timestamp, error) {
t, err := iso8601.ParseString(s)
if err != nil {
return 0, err
func asTimestamp(s string, formats ...string) (pcommon.Timestamp, error) {
var err error
var t time.Time
// Try parsing with provided formats first
for _, format := range formats {
if t, err = time.Parse(format, s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
}

return pcommon.Timestamp(t.UnixNano()), nil
// Fallback to ISO 8601 parsing if no format matches
if t, err = iso8601.ParseString(s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
return 0, err
}

// asSeverity converts the Azure log level to equivalent
Expand Down
17 changes: 17 additions & 0 deletions pkg/translator/azurelogs/resourcelogs_to_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,23 @@ func TestAsTimestamp(t *testing.T) {
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05")
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but fallback to iso8601 and succeeded to parse
timestamp = "2022-11-11T04:48:27.6767145Z"
nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05")
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but all failed to parse
timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, "2006-01-02 15:04:05")
assert.Error(t, err)
assert.Equal(t, pcommon.Timestamp(0), nanos)

timestamp = "invalid-time"
nanos, err = asTimestamp(timestamp)
assert.Error(t, err)
Expand Down
11 changes: 11 additions & 0 deletions receiver/azureeventhubreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ attribute names are copied without any changes.

Default: `false` (semantic conventions are not applied)

### time_formats (optional)

All supported time format for logs, metrics and traces. Default is `nil` (unset), which means using the current iso8601 parser. The format is based on https://pkg.go.dev/time#Layout. If no time-zone info, will use UTC time. If all failed, it will use iso8601 format to parse.

Default: `nil`

### Example Configuration

```yaml
Expand All @@ -60,6 +66,11 @@ receivers:
group: bar
offset: "1234-5566"
format: "azure"
# optional
time_formats:
# All supported time format. Default is empty string array, which means using the current iso8601 parser. The format is based on https://pkg.go.dev/time#Layout. If no time-zone info, will use UTC time.
logs: ["01/02/2006 15:04:05","2006-01-02 15:04:05","2006-01-02T15:04:05Z07:00"]
metrics: ["01/02/2006 15:04:05"]
```
This component can persist its state using the [storage extension].
Expand Down
12 changes: 7 additions & 5 deletions receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@ type AzureResourceLogsEventUnmarshaler struct {
unmarshaler logsUnmarshaler
}

func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, applySemanticConventions bool) eventLogsUnmarshaler {
func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, applySemanticConventions bool, timeFormat []string) eventLogsUnmarshaler {
if applySemanticConventions {
return AzureResourceLogsEventUnmarshaler{
unmarshaler: &azurelogs.ResourceLogsUnmarshaler{
Version: buildInfo.Version,
Logger: logger,
Version: buildInfo.Version,
Logger: logger,
TimeFormats: timeFormat,
},
}
}
return AzureResourceLogsEventUnmarshaler{
unmarshaler: &azure.ResourceLogsUnmarshaler{
Version: buildInfo.Version,
Logger: logger,
Version: buildInfo.Version,
Logger: logger,
TimeFormats: timeFormat,
},
}
}
Expand Down
37 changes: 23 additions & 14 deletions receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata"
)

const (
azureResourceID = "azure.resource.id"
)
const azureResourceID = "azure.resource.id"

type azureResourceMetricsUnmarshaler struct {
buildInfo component.BuildInfo
logger *zap.Logger
buildInfo component.BuildInfo
logger *zap.Logger
TimeFormat []string
}

// azureMetricRecords represents an array of Azure metric records
Expand All @@ -50,10 +49,11 @@ type azureMetricRecord struct {
Average float64 `json:"average"`
}

func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventMetricsUnmarshaler {
func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, timeFormat []string) eventMetricsUnmarshaler {
return azureResourceMetricsUnmarshaler{
buildInfo: buildInfo,
logger: logger,
buildInfo: buildInfo,
logger: logger,
TimeFormat: timeFormat,
}
}

Expand Down Expand Up @@ -90,7 +90,7 @@ func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *eventhub.Event)
resourceID = azureMetric.ResourceID
}

nanos, err := asTimestamp(azureMetric.Time)
nanos, err := asTimestamp(azureMetric.Time, r.TimeFormat)
if err != nil {
r.logger.Warn("Invalid Timestamp", zap.String("time", azureMetric.Time))
continue
Expand Down Expand Up @@ -152,10 +152,19 @@ func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *eventhub.Event)
// asTimestamp will parse an ISO8601 string into an OpenTelemetry
// nanosecond timestamp. If the string cannot be parsed, it will
// return zero and the error.
func asTimestamp(s string) (pcommon.Timestamp, error) {
t, err := iso8601.ParseString(s)
if err != nil {
return 0, err
func asTimestamp(s string, formats []string) (pcommon.Timestamp, error) {
var err error
var t time.Time
// Try parsing with provided formats first
for _, format := range formats {
if t, err = time.Parse(format, s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
}

// Fallback to ISO 8601 parsing if no format matches
if t, err = iso8601.ParseString(s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
return pcommon.Timestamp(t.UnixNano()), nil
return 0, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ type azureTracesEventUnmarshaler struct {
unmarshaler *azure.TracesUnmarshaler
}

func newAzureTracesUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventTracesUnmarshaler {
func newAzureTracesUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, timeFormat []string) eventTracesUnmarshaler {
return azureTracesEventUnmarshaler{
unmarshaler: &azure.TracesUnmarshaler{
Version: buildInfo.Version,
Logger: logger,
Version: buildInfo.Version,
Logger: logger,
TimeFormats: timeFormat,
},
}
}
Expand Down
7 changes: 7 additions & 0 deletions receiver/azureeventhubreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type Config struct {
Format string `mapstructure:"format"`
ConsumerGroup string `mapstructure:"group"`
ApplySemanticConventions bool `mapstructure:"apply_semantic_conventions"`
TimeFormats TimeFormat `mapstructure:"time_formats"`
}

type TimeFormat struct {
Logs []string `mapstructure:"logs"`
Metrics []string `mapstructure:"metrics"`
Traces []string `mapstructure:"traces"`
}

func isValidFormat(format string) bool {
Expand Down
Loading

0 comments on commit f72ef07

Please sign in to comment.