Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/integration-tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ services:
- integration-tests

loki:
image: grafana/loki:latest
image: grafana/loki:3.5.8
command: -config.file=/etc/loki/local-config.yaml
ports:
- "3100:3100"
Expand Down
173 changes: 73 additions & 100 deletions internal/component/loki/source/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestLokiSourceAPI_Simple(t *testing.T) {
receiver := fake.NewClient(func() {})
defer receiver.Stop()

args := testArgsWith(t, func(a *Arguments) {
args := testArgsWith(func(a *Arguments) {
a.Server.HTTP.ListenPort = 8532
a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}
a.UseIncomingTimestamp = true
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestLokiSourceAPI_Update(t *testing.T) {
receiver := fake.NewClient(func() {})
defer receiver.Stop()

args := testArgsWith(t, func(a *Arguments) {
args := testArgsWith(func(a *Arguments) {
a.Server.HTTP.ListenPort = 8583
a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}
a.UseIncomingTimestamp = true
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {
receivers[i] = fake.NewClient(func() {})
}

args := testArgsWith(t, func(a *Arguments) {
args := testArgsWith(func(a *Arguments) {
a.Server.HTTP.ListenPort = 8537
a.ForwardTo = mapToChannels(receivers)
})
Expand Down Expand Up @@ -267,73 +267,63 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {
}

func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
httpPort := getFreePort(t)
grpcPort := getFreePort(t, httpPort)
tests := []struct {
name string
args Arguments
newArgs Arguments
changeHttpPort bool
restartRequired bool
}{
{
name: "identical args don't require server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWithPorts(httpPort, grpcPort),
args: testArgs(),
newArgs: testArgs(),
restartRequired: false,
},
{
name: "change in address requires server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args: testArgs(),
newArgs: testArgsWith(func(args *Arguments) {
args.Server.HTTP.ListenAddress = "localhost"
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: true,
},
{
name: "change in port requires server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWithPorts(getFreePort(t, httpPort, grpcPort), grpcPort),
args: testArgs(),
changeHttpPort: true,
newArgs: testArgs(),
restartRequired: true,
},
{
name: "change in forwardTo does not require server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args: testArgs(),
newArgs: testArgsWith(func(args *Arguments) {
args.ForwardTo = []loki.LogsReceiver{}
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: false,
},
{
name: "change in labels does not require server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args: testArgs(),
newArgs: testArgsWith(func(args *Arguments) {
args.Labels = map[string]string{"some": "label"}
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: false,
},
{
name: "change in relabel rules does not require server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args: testArgs(),
newArgs: testArgsWith(func(args *Arguments) {
args.RelabelRules = relabel.Rules{}
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: false,
},
{
name: "change in use incoming timestamp does not require server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args: testArgs(),
newArgs: testArgsWith(func(args *Arguments) {
args.UseIncomingTimestamp = !args.UseIncomingTimestamp
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: false,
},
Expand All @@ -346,6 +336,13 @@ func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
comp := startTestComponent(t, defaultOptions(), tc.args, ctx)

serverBefore := comp.server

if tc.changeHttpPort {
httpPort, err := freeport.GetFreePort()
require.NoError(t, err)
tc.newArgs.Server.HTTP.ListenPort = httpPort
}

require.NoError(t, comp.Update(tc.newArgs))

restarted := serverBefore != comp.server
Expand All @@ -368,8 +365,7 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
receiver := fake.NewClient(func() {})
defer receiver.Stop()

args := testArgsWith(t, func(a *Arguments) {
a.Server.HTTP.ListenPort = getFreePort(t)
args := testArgsWith(func(a *Arguments) {
a.Server.HTTP.TLSConfig = &fnet.TLSConfig{
Cert: testCert,
Key: alloytypes.Secret(testKey),
Expand All @@ -378,10 +374,10 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
a.UseIncomingTimestamp = true
})
opts := defaultOptions()
_ = startTestComponent(t, opts, args, ctx)
c := startTestComponent(t, opts, args, ctx)

// Create TLS-enabled Loki client
lokiClient := newTestLokiClientTLS(t, args, opts)
lokiClient := newTestLokiClientTLS(t, c.server.HTTPListenAddress(), opts)
defer lokiClient.Stop()

now := time.Now()
Expand All @@ -396,7 +392,9 @@ func TestLokiSourceAPI_TLS(t *testing.T) {

require.Eventually(
t,
func() bool { return len(receiver.Received()) == 1 },
func() bool {
return len(receiver.Received()) == 1
},
10*time.Second,
10*time.Millisecond,
"did not receive the forwarded message within the timeout",
Expand All @@ -412,12 +410,11 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
}

// newTestLokiClientTLS creates a Loki client configured for TLS connections
func newTestLokiClientTLS(t *testing.T, args Arguments, opts component.Options) client.Client {
func newTestLokiClientTLS(t *testing.T, httpListenAddress string, opts component.Options) client.Client {
url := flagext.URLValue{}
err := url.Set(fmt.Sprintf(
"https://%s:%d/api/v1/push",
args.Server.HTTP.ListenAddress,
args.Server.HTTP.ListenPort,
"https://%s/api/v1/push",
httpListenAddress,
))
require.NoError(t, err)

Expand All @@ -439,7 +436,7 @@ func newTestLokiClientTLS(t *testing.T, args Arguments, opts component.Options)
}

func TestDefaultServerConfig(t *testing.T) {
args := testArgs(t)
args := testArgs()
args.Server = nil // user did not define server options

comp, err := New(
Expand Down Expand Up @@ -484,7 +481,7 @@ func startTestComponent(
}

func TestShutdown(t *testing.T) {
args := testArgsWith(t, func(a *Arguments) {
args := testArgsWith(func(a *Arguments) {
a.Server.GracefulShutdownTimeout = 5 * time.Second
a.ForwardTo = []loki.LogsReceiver{loki.NewLogsReceiver()}
})
Expand All @@ -503,13 +500,13 @@ func TestShutdown(t *testing.T) {
waitForServerToBeReady(t, comp)

// First request should be forwarded on channel
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), args))
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
require.NoError(t, err)

codes := make(chan int)
for range 5 {
go func() {
res, err := http.DefaultClient.Do(newRequest(t, context.Background(), args))
res, err := http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
if err != nil || res == nil {
// This should not happen but if it does we return -1 here so test will fail.
codes <- -1
Expand Down Expand Up @@ -537,7 +534,7 @@ func TestShutdown(t *testing.T) {
}

func TestCancelRequest(t *testing.T) {
args := testArgsWith(t, func(a *Arguments) {
args := testArgsWith(func(a *Arguments) {
a.Server.GracefulShutdownTimeout = 5 * time.Second
a.ForwardTo = []loki.LogsReceiver{loki.NewLogsReceiver()}
})
Expand All @@ -556,15 +553,15 @@ func TestCancelRequest(t *testing.T) {
waitForServerToBeReady(t, comp)

// First request should be forwarded on channel
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), args))
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
require.NoError(t, err)

var wg sync.WaitGroup
for range 5 {
wg.Go(func() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
res, err := http.DefaultClient.Do(newRequest(t, ctx, args))
res, err := http.DefaultClient.Do(newRequest(t, ctx, comp.server.HTTPListenAddress()))
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, res)
})
Expand All @@ -573,52 +570,44 @@ func TestCancelRequest(t *testing.T) {
wg.Wait()
}

func newRequest(t *testing.T, ctx context.Context, args Arguments) *http.Request {
func newRequest(t *testing.T, ctx context.Context, httpListendAddress string) *http.Request {
body := bytes.Buffer{}
err := util.SerializeProto(&body, &push.PushRequest{Streams: []push.Stream{{Labels: `{foo="foo"}`, Entries: []push.Entry{{Line: "line"}}}}}, util.RawSnappy)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s:%d/loki/api/v1/push", args.Server.HTTP.ListenAddress, args.Server.HTTP.ListenPort), &body)
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s/loki/api/v1/push", httpListendAddress), &body)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/x-protobuf")
return req
}

func waitForServerToBeReady(t *testing.T, comp *Component) {
require.Eventuallyf(t, func() bool {
// Determine if TLS is enabled to choose the right protocol
protocol := "http"
var tlsConfig *tls.Config

serverConfig := comp.server.ServerConfig()
if serverConfig.HTTP.TLSConfig != nil {
protocol = "https"
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
// Determine if TLS is enabled to choose the right protocol
protocol := "http"
var tlsConfig *tls.Config

serverConfig := comp.server.ServerConfig()
if serverConfig.HTTP.TLSConfig != nil {
protocol = "https"
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}

url := fmt.Sprintf(
"%s://%v:%d/wrong/url",
protocol,
serverConfig.HTTP.ListenAddress,
serverConfig.HTTP.ListenPort,
)

var resp *http.Response
var err error
url := fmt.Sprintf(
"%s://%s/wrong/url",
protocol,
comp.server.HTTPListenAddress(),
)

if protocol == "https" {
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Timeout: 1 * time.Second,
}
resp, err = client.Get(url)
} else {
client := &http.Client{Timeout: 1 * time.Second}
resp, err = client.Get(url)
client := &http.Client{Timeout: 1 * time.Second}
if protocol == "https" {
client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}

require.Eventuallyf(t, func() bool {
resp, err := client.Get(url)

return err == nil && resp != nil && resp.StatusCode == 404
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
Expand Down Expand Up @@ -661,16 +650,14 @@ func defaultOptions() component.Options {
}
}

func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments {
a := testArgs(t)
mutator(&a)
return a
func testArgs() Arguments {
return testArgsWithPorts(0, 0)
}

func testArgs(t *testing.T) Arguments {
httpPort := getFreePort(t)
grpPort := getFreePort(t, httpPort)
return testArgsWithPorts(httpPort, grpPort)
func testArgsWith(mutator func(arguments *Arguments)) Arguments {
a := testArgsWithPorts(0, 0)
mutator(&a)
return a
}

func testArgsWithPorts(httpPort int, grpcPort int) Arguments {
Expand Down Expand Up @@ -698,17 +685,3 @@ func testArgsWithPorts(httpPort int, grpcPort int) Arguments {
MaxSendMessageSize: 100 * units.MiB,
}
}

func getFreePort(t *testing.T, exclude ...int) int {
const maxRetries = 10
for range maxRetries {
port, err := freeport.GetFreePort()
require.NoError(t, err)
if !slices.Contains(exclude, port) {
return port
}
}

t.Fatal("fail to get free port")
return 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ func (s *PushAPIServer) ServerConfig() fnet.ServerConfig {
return *s.serverConfig
}

func (s *PushAPIServer) HTTPListenAddress() string {
return s.server.HTTPListenAddr()
}

func (s *PushAPIServer) GRPCListenAddress() string {
return s.server.GRPCListenAddr()
}

func (s *PushAPIServer) Shutdown() {
level.Info(s.logger).Log("msg", "stopping push API server")
// StopAndShutdown tries to gracefully shutdown.
Expand Down
Loading