Skip to content

Commit

Permalink
add loki parser to parse servicePort from config
Browse files Browse the repository at this point in the history
  • Loading branch information
fbuetler committed Dec 13, 2023
1 parent 0ce68dc commit a8c4f7f
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-loki-parser.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: collector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add Loki parser to extract loki service port from config

# One or more tracking issues related to the change
issues: [1825]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
131 changes: 131 additions & 0 deletions internal/manifests/collector/parser/receiver/receiver_loki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receiver

import (
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
)

var _ parser.ComponentPortParser = &LokiReceiverParser{}

const (
parserNameLoki = "__loki"

defaultLokiGRPCPort int32 = 3500
defaultLokiHTTPPort int32 = 3600
)

// LokiReceiverParser parses the configuration for Loki receivers.
type LokiReceiverParser struct {
config map[interface{}]interface{}
logger logr.Logger
name string
}

// NewLokiReceiverParser builds a new parser for Loki receivers.
func NewLokiReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
if protocols, ok := config["protocols"].(map[interface{}]interface{}); ok {
return &LokiReceiverParser{
logger: logger,
name: name,
config: protocols,
}
}

return &LokiReceiverParser{
name: name,
config: map[interface{}]interface{}{},
}
}

// Ports returns all the service ports for all protocols in this parser.
func (o *LokiReceiverParser) Ports() ([]corev1.ServicePort, error) {
ports := []corev1.ServicePort{}

for _, protocol := range []struct {
name string
defaultPorts []corev1.ServicePort
}{
{
name: grpc,
defaultPorts: []corev1.ServicePort{
{
Name: naming.PortName(fmt.Sprintf("%s-grpc", o.name), defaultLokiGRPCPort),
Port: defaultLokiGRPCPort,
TargetPort: intstr.FromInt(int(defaultLokiGRPCPort)),
AppProtocol: &grpc,
},
},
},
{
name: http,
defaultPorts: []corev1.ServicePort{
{
Name: naming.PortName(fmt.Sprintf("%s-http", o.name), defaultLokiHTTPPort),
Port: defaultLokiHTTPPort,
TargetPort: intstr.FromInt(int(defaultLokiHTTPPort)),
AppProtocol: &http,
},
},
},
} {
// do we have the protocol specified at all?
if receiverProtocol, ok := o.config[protocol.name]; ok {
// we have the specified protocol, we definitely need a service port
nameWithProtocol := fmt.Sprintf("%s-%s", o.name, protocol.name)
var protocolPort *corev1.ServicePort

// do we have a configuration block for the protocol?
settings, ok := receiverProtocol.(map[interface{}]interface{})
if ok {
protocolPort = singlePortFromConfigEndpoint(o.logger, nameWithProtocol, settings)
}

// have we parsed a port based on the configuration block?
// if not, we use the default port
if protocolPort == nil {
ports = append(ports, protocol.defaultPorts...)
} else {
// infer protocol and appProtocol from protocol.name
if protocol.name == grpc {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &grpc
} else if protocol.name == http {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &http
}
ports = append(ports, *protocolPort)
}
}
}

return ports, nil
}

// ParserName returns the name of this parser.
func (o *LokiReceiverParser) ParserName() string {
return parserNameLoki
}

func init() {
Register("loki", NewLokiReceiverParser)
}
109 changes: 109 additions & 0 deletions internal/manifests/collector/parser/receiver/receiver_loki_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receiver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLokiSelfRegisters(t *testing.T) {
// verify
assert.True(t, IsRegistered("loki"))
}

func TestLokiIsFoundByName(t *testing.T) {
// test
p, err := For(logger, "loki", map[interface{}]interface{}{})
assert.NoError(t, err)

// verify
assert.Equal(t, "__loki", p.ParserName())
}

func TestLokiPortsOverridden(t *testing.T) {
// prepare
builder := NewLokiReceiverParser(logger, "loki", map[interface{}]interface{}{
"protocols": map[interface{}]interface{}{
"grpc": map[interface{}]interface{}{
"endpoint": "0.0.0.0:1234",
},
"http": map[interface{}]interface{}{
"endpoint": "0.0.0.0:1235",
},
},
})

expectedResults := map[string]struct {
portNumber int32
seen bool
}{
"loki-grpc": {portNumber: 1234},
"loki-http": {portNumber: 1235},
}

// test
ports, err := builder.Ports()

// verify
assert.NoError(t, err)
assert.Len(t, ports, len(expectedResults))

for _, port := range ports {
r := expectedResults[port.Name]
r.seen = true
expectedResults[port.Name] = r
assert.EqualValues(t, r.portNumber, port.Port)
}
for k, v := range expectedResults {
assert.True(t, v.seen, "the port %s wasn't included in the service ports", k)
}
}

func TestLokiExposeDefaultPorts(t *testing.T) {
// prepare
builder := NewLokiReceiverParser(logger, "loki", map[interface{}]interface{}{
"protocols": map[interface{}]interface{}{
"grpc": map[interface{}]interface{}{},
"http": map[interface{}]interface{}{},
},
})

expectedResults := map[string]struct {
portNumber int32
seen bool
}{
"loki-grpc": {portNumber: 3500},
"loki-http": {portNumber: 3600},
}

// test
ports, err := builder.Ports()

// verify
assert.NoError(t, err)
assert.Len(t, ports, len(expectedResults))

for _, port := range ports {
r := expectedResults[port.Name]
r.seen = true
expectedResults[port.Name] = r
assert.EqualValues(t, r.portNumber, port.Port)
}
for k, v := range expectedResults {
assert.True(t, v.seen, "the port %s wasn't included in the service ports", k)
}
}

0 comments on commit a8c4f7f

Please sign in to comment.