Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add loki parser to parse servicePort from config #2437

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/add-loki-parser.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: collector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add Loki parser to extract loki service port from config

# One or more tracking issues related to the change
issues: [1825]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
131 changes: 131 additions & 0 deletions internal/manifests/collector/parser/receiver/receiver_loki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receiver

import (
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
)

var _ parser.ComponentPortParser = &LokiReceiverParser{}

const (
parserNameLoki = "__loki"

defaultLokiGRPCPort int32 = 9095
defaultLokiHTTPPort int32 = 3100
)

// LokiReceiverParser parses the configuration for Loki receivers.
type LokiReceiverParser struct {
config map[interface{}]interface{}
logger logr.Logger
name string
}

// NewLokiReceiverParser builds a new parser for Loki receivers.
func NewLokiReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
if protocols, ok := config["protocols"].(map[interface{}]interface{}); ok {
return &LokiReceiverParser{
logger: logger,
name: name,
config: protocols,
}
}

return &LokiReceiverParser{
name: name,
config: map[interface{}]interface{}{},
}
}

// Ports returns all the service ports for all protocols in this parser.
func (o *LokiReceiverParser) Ports() ([]corev1.ServicePort, error) {
ports := []corev1.ServicePort{}

for _, protocol := range []struct {
name string
defaultPorts []corev1.ServicePort
}{
{
name: grpc,
defaultPorts: []corev1.ServicePort{
{
Name: naming.PortName(fmt.Sprintf("%s-grpc", o.name), defaultLokiGRPCPort),
Port: defaultLokiGRPCPort,
TargetPort: intstr.FromInt(int(defaultLokiGRPCPort)),
AppProtocol: &grpc,
},
},
},
{
name: http,
defaultPorts: []corev1.ServicePort{
{
Name: naming.PortName(fmt.Sprintf("%s-http", o.name), defaultLokiHTTPPort),
Port: defaultLokiHTTPPort,
TargetPort: intstr.FromInt(int(defaultLokiHTTPPort)),
AppProtocol: &http,
},
},
},
} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the formatting here is a bit odd... why have another {?

Copy link
Contributor Author

@fbuetler fbuetler Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the actual for loop begins. Before that, it was only the array definition.
It is the same formatting as in the OTLP receiver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, yeah i usually have only seen this in the test blocks so my brain didn't parse that correctly. Thanks for explaining!

// do we have the protocol specified at all?
if receiverProtocol, ok := o.config[protocol.name]; ok {
// we have the specified protocol, we definitely need a service port
nameWithProtocol := fmt.Sprintf("%s-%s", o.name, protocol.name)
var protocolPort *corev1.ServicePort

// do we have a configuration block for the protocol?
settings, ok := receiverProtocol.(map[interface{}]interface{})
if ok {
protocolPort = singlePortFromConfigEndpoint(o.logger, nameWithProtocol, settings)
}

// have we parsed a port based on the configuration block?
// if not, we use the default port
if protocolPort == nil {
ports = append(ports, protocol.defaultPorts...)
} else {
// infer protocol and appProtocol from protocol.name
if protocol.name == grpc {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &grpc
} else if protocol.name == http {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &http
}
ports = append(ports, *protocolPort)
}
}
}

return ports, nil
}

// ParserName returns the name of this parser.
func (o *LokiReceiverParser) ParserName() string {
return parserNameLoki
}

func init() {
Register("loki", NewLokiReceiverParser)
}
109 changes: 109 additions & 0 deletions internal/manifests/collector/parser/receiver/receiver_loki_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receiver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLokiSelfRegisters(t *testing.T) {
// verify
assert.True(t, IsRegistered("loki"))
}

func TestLokiIsFoundByName(t *testing.T) {
// test
p, err := For(logger, "loki", map[interface{}]interface{}{})
assert.NoError(t, err)

// verify
assert.Equal(t, "__loki", p.ParserName())
}

func TestLokiPortsOverridden(t *testing.T) {
// prepare
builder := NewLokiReceiverParser(logger, "loki", map[interface{}]interface{}{
"protocols": map[interface{}]interface{}{
"grpc": map[interface{}]interface{}{
"endpoint": "0.0.0.0:1234",
},
"http": map[interface{}]interface{}{
"endpoint": "0.0.0.0:1235",
},
},
})

expectedResults := map[string]struct {
portNumber int32
seen bool
}{
"loki-grpc": {portNumber: 1234},
"loki-http": {portNumber: 1235},
}

// test
ports, err := builder.Ports()

// verify
assert.NoError(t, err)
assert.Len(t, ports, len(expectedResults))

for _, port := range ports {
r := expectedResults[port.Name]
r.seen = true
expectedResults[port.Name] = r
assert.EqualValues(t, r.portNumber, port.Port)
}
for k, v := range expectedResults {
assert.True(t, v.seen, "the port %s wasn't included in the service ports", k)
}
}

func TestLokiExposeDefaultPorts(t *testing.T) {
// prepare
builder := NewLokiReceiverParser(logger, "loki", map[interface{}]interface{}{
"protocols": map[interface{}]interface{}{
"grpc": map[interface{}]interface{}{},
"http": map[interface{}]interface{}{},
},
})

expectedResults := map[string]struct {
portNumber int32
seen bool
}{
"loki-grpc": {portNumber: 9095},
"loki-http": {portNumber: 3100},
}

// test
ports, err := builder.Ports()

// verify
assert.NoError(t, err)
assert.Len(t, ports, len(expectedResults))

for _, port := range ports {
r := expectedResults[port.Name]
r.seen = true
expectedResults[port.Name] = r
assert.EqualValues(t, r.portNumber, port.Port)
}
for k, v := range expectedResults {
assert.True(t, v.seen, "the port %s wasn't included in the service ports", k)
}
}
Loading