diff --git a/internal/cmd/integration-tests/docker-compose.yaml b/internal/cmd/integration-tests/docker-compose.yaml index 030e733ce88..ede4a19724f 100644 --- a/internal/cmd/integration-tests/docker-compose.yaml +++ b/internal/cmd/integration-tests/docker-compose.yaml @@ -64,7 +64,7 @@ services: - integration-tests loki: - image: grafana/loki:latest + image: grafana/loki:3.5.8 command: -config.file=/etc/loki/local-config.yaml ports: - "3100:3100" diff --git a/internal/component/loki/source/api/api_test.go b/internal/component/loki/source/api/api_test.go index da383cd5d6d..2a33ea8b550 100644 --- a/internal/component/loki/source/api/api_test.go +++ b/internal/component/loki/source/api/api_test.go @@ -99,7 +99,7 @@ func TestLokiSourceAPI_Simple(t *testing.T) { receiver := fake.NewClient(func() {}) defer receiver.Stop() - args := testArgsWith(t, func(a *Arguments) { + args := testArgsWith(func(a *Arguments) { a.Server.HTTP.ListenPort = 8532 a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()} a.UseIncomingTimestamp = true @@ -144,7 +144,7 @@ func TestLokiSourceAPI_Update(t *testing.T) { receiver := fake.NewClient(func() {}) defer receiver.Stop() - args := testArgsWith(t, func(a *Arguments) { + args := testArgsWith(func(a *Arguments) { a.Server.HTTP.ListenPort = 8583 a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()} a.UseIncomingTimestamp = true @@ -221,7 +221,7 @@ func TestLokiSourceAPI_FanOut(t *testing.T) { receivers[i] = fake.NewClient(func() {}) } - args := testArgsWith(t, func(a *Arguments) { + args := testArgsWith(func(a *Arguments) { a.Server.HTTP.ListenPort = 8537 a.ForwardTo = mapToChannels(receivers) }) @@ -267,73 +267,63 @@ func TestLokiSourceAPI_FanOut(t *testing.T) { } func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) { - httpPort := getFreePort(t) - grpcPort := getFreePort(t, httpPort) tests := []struct { name string args Arguments newArgs Arguments + changeHttpPort bool restartRequired bool }{ { name: "identical args don't require server restart", - args: testArgsWithPorts(httpPort, grpcPort), - newArgs: testArgsWithPorts(httpPort, grpcPort), + args: testArgs(), + newArgs: testArgs(), restartRequired: false, }, { name: "change in address requires server restart", - args: testArgsWithPorts(httpPort, grpcPort), - newArgs: testArgsWith(t, func(args *Arguments) { + args: testArgs(), + newArgs: testArgsWith(func(args *Arguments) { args.Server.HTTP.ListenAddress = "localhost" - args.Server.HTTP.ListenPort = httpPort - args.Server.GRPC.ListenPort = grpcPort }), restartRequired: true, }, { name: "change in port requires server restart", - args: testArgsWithPorts(httpPort, grpcPort), - newArgs: testArgsWithPorts(getFreePort(t, httpPort, grpcPort), grpcPort), + args: testArgs(), + changeHttpPort: true, + newArgs: testArgs(), restartRequired: true, }, { name: "change in forwardTo does not require server restart", - args: testArgsWithPorts(httpPort, grpcPort), - newArgs: testArgsWith(t, func(args *Arguments) { + args: testArgs(), + newArgs: testArgsWith(func(args *Arguments) { args.ForwardTo = []loki.LogsReceiver{} - args.Server.HTTP.ListenPort = httpPort - args.Server.GRPC.ListenPort = grpcPort }), restartRequired: false, }, { name: "change in labels does not require server restart", - args: testArgsWithPorts(httpPort, grpcPort), - newArgs: testArgsWith(t, func(args *Arguments) { + args: testArgs(), + newArgs: testArgsWith(func(args *Arguments) { args.Labels = map[string]string{"some": "label"} - args.Server.HTTP.ListenPort = httpPort - args.Server.GRPC.ListenPort = grpcPort }), restartRequired: false, }, { name: "change in relabel rules does not require server restart", - args: testArgsWithPorts(httpPort, grpcPort), - newArgs: testArgsWith(t, func(args *Arguments) { + args: testArgs(), + newArgs: testArgsWith(func(args *Arguments) { args.RelabelRules = relabel.Rules{} - args.Server.HTTP.ListenPort = httpPort - args.Server.GRPC.ListenPort = grpcPort }), restartRequired: false, }, { name: "change in use incoming timestamp does not require server restart", - args: testArgsWithPorts(httpPort, grpcPort), - newArgs: testArgsWith(t, func(args *Arguments) { + args: testArgs(), + newArgs: testArgsWith(func(args *Arguments) { args.UseIncomingTimestamp = !args.UseIncomingTimestamp - args.Server.HTTP.ListenPort = httpPort - args.Server.GRPC.ListenPort = grpcPort }), restartRequired: false, }, @@ -346,6 +336,13 @@ func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) { comp := startTestComponent(t, defaultOptions(), tc.args, ctx) serverBefore := comp.server + + if tc.changeHttpPort { + httpPort, err := freeport.GetFreePort() + require.NoError(t, err) + tc.newArgs.Server.HTTP.ListenPort = httpPort + } + require.NoError(t, comp.Update(tc.newArgs)) restarted := serverBefore != comp.server @@ -368,8 +365,7 @@ func TestLokiSourceAPI_TLS(t *testing.T) { receiver := fake.NewClient(func() {}) defer receiver.Stop() - args := testArgsWith(t, func(a *Arguments) { - a.Server.HTTP.ListenPort = getFreePort(t) + args := testArgsWith(func(a *Arguments) { a.Server.HTTP.TLSConfig = &fnet.TLSConfig{ Cert: testCert, Key: alloytypes.Secret(testKey), @@ -378,10 +374,10 @@ func TestLokiSourceAPI_TLS(t *testing.T) { a.UseIncomingTimestamp = true }) opts := defaultOptions() - _ = startTestComponent(t, opts, args, ctx) + c := startTestComponent(t, opts, args, ctx) // Create TLS-enabled Loki client - lokiClient := newTestLokiClientTLS(t, args, opts) + lokiClient := newTestLokiClientTLS(t, c.server.HTTPListenAddress(), opts) defer lokiClient.Stop() now := time.Now() @@ -396,7 +392,9 @@ func TestLokiSourceAPI_TLS(t *testing.T) { require.Eventually( t, - func() bool { return len(receiver.Received()) == 1 }, + func() bool { + return len(receiver.Received()) == 1 + }, 10*time.Second, 10*time.Millisecond, "did not receive the forwarded message within the timeout", @@ -412,12 +410,11 @@ func TestLokiSourceAPI_TLS(t *testing.T) { } // newTestLokiClientTLS creates a Loki client configured for TLS connections -func newTestLokiClientTLS(t *testing.T, args Arguments, opts component.Options) client.Client { +func newTestLokiClientTLS(t *testing.T, httpListenAddress string, opts component.Options) client.Client { url := flagext.URLValue{} err := url.Set(fmt.Sprintf( - "https://%s:%d/api/v1/push", - args.Server.HTTP.ListenAddress, - args.Server.HTTP.ListenPort, + "https://%s/api/v1/push", + httpListenAddress, )) require.NoError(t, err) @@ -439,7 +436,7 @@ func newTestLokiClientTLS(t *testing.T, args Arguments, opts component.Options) } func TestDefaultServerConfig(t *testing.T) { - args := testArgs(t) + args := testArgs() args.Server = nil // user did not define server options comp, err := New( @@ -484,7 +481,7 @@ func startTestComponent( } func TestShutdown(t *testing.T) { - args := testArgsWith(t, func(a *Arguments) { + args := testArgsWith(func(a *Arguments) { a.Server.GracefulShutdownTimeout = 5 * time.Second a.ForwardTo = []loki.LogsReceiver{loki.NewLogsReceiver()} }) @@ -503,13 +500,13 @@ func TestShutdown(t *testing.T) { waitForServerToBeReady(t, comp) // First request should be forwarded on channel - _, err = http.DefaultClient.Do(newRequest(t, context.Background(), args)) + _, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress())) require.NoError(t, err) codes := make(chan int) for range 5 { go func() { - res, err := http.DefaultClient.Do(newRequest(t, context.Background(), args)) + res, err := http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress())) if err != nil || res == nil { // This should not happen but if it does we return -1 here so test will fail. codes <- -1 @@ -537,7 +534,7 @@ func TestShutdown(t *testing.T) { } func TestCancelRequest(t *testing.T) { - args := testArgsWith(t, func(a *Arguments) { + args := testArgsWith(func(a *Arguments) { a.Server.GracefulShutdownTimeout = 5 * time.Second a.ForwardTo = []loki.LogsReceiver{loki.NewLogsReceiver()} }) @@ -556,7 +553,7 @@ func TestCancelRequest(t *testing.T) { waitForServerToBeReady(t, comp) // First request should be forwarded on channel - _, err = http.DefaultClient.Do(newRequest(t, context.Background(), args)) + _, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress())) require.NoError(t, err) var wg sync.WaitGroup @@ -564,7 +561,7 @@ func TestCancelRequest(t *testing.T) { wg.Go(func() { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer cancel() - res, err := http.DefaultClient.Do(newRequest(t, ctx, args)) + res, err := http.DefaultClient.Do(newRequest(t, ctx, comp.server.HTTPListenAddress())) require.ErrorIs(t, err, context.DeadlineExceeded) require.Nil(t, res) }) @@ -573,52 +570,44 @@ func TestCancelRequest(t *testing.T) { wg.Wait() } -func newRequest(t *testing.T, ctx context.Context, args Arguments) *http.Request { +func newRequest(t *testing.T, ctx context.Context, httpListendAddress string) *http.Request { body := bytes.Buffer{} err := util.SerializeProto(&body, &push.PushRequest{Streams: []push.Stream{{Labels: `{foo="foo"}`, Entries: []push.Entry{{Line: "line"}}}}}, util.RawSnappy) require.NoError(t, err) - req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s:%d/loki/api/v1/push", args.Server.HTTP.ListenAddress, args.Server.HTTP.ListenPort), &body) + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s/loki/api/v1/push", httpListendAddress), &body) require.NoError(t, err) req.Header.Set("Content-Type", "application/x-protobuf") return req } func waitForServerToBeReady(t *testing.T, comp *Component) { - require.Eventuallyf(t, func() bool { - // Determine if TLS is enabled to choose the right protocol - protocol := "http" - var tlsConfig *tls.Config - - serverConfig := comp.server.ServerConfig() - if serverConfig.HTTP.TLSConfig != nil { - protocol = "https" - tlsConfig = &tls.Config{ - InsecureSkipVerify: true, - } + // Determine if TLS is enabled to choose the right protocol + protocol := "http" + var tlsConfig *tls.Config + + serverConfig := comp.server.ServerConfig() + if serverConfig.HTTP.TLSConfig != nil { + protocol = "https" + tlsConfig = &tls.Config{ + InsecureSkipVerify: true, } + } - url := fmt.Sprintf( - "%s://%v:%d/wrong/url", - protocol, - serverConfig.HTTP.ListenAddress, - serverConfig.HTTP.ListenPort, - ) - - var resp *http.Response - var err error + url := fmt.Sprintf( + "%s://%s/wrong/url", + protocol, + comp.server.HTTPListenAddress(), + ) - if protocol == "https" { - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsConfig, - }, - Timeout: 1 * time.Second, - } - resp, err = client.Get(url) - } else { - client := &http.Client{Timeout: 1 * time.Second} - resp, err = client.Get(url) + client := &http.Client{Timeout: 1 * time.Second} + if protocol == "https" { + client.Transport = &http.Transport{ + TLSClientConfig: tlsConfig, } + } + + require.Eventuallyf(t, func() bool { + resp, err := client.Get(url) return err == nil && resp != nil && resp.StatusCode == 404 }, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout") @@ -661,16 +650,14 @@ func defaultOptions() component.Options { } } -func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments { - a := testArgs(t) - mutator(&a) - return a +func testArgs() Arguments { + return testArgsWithPorts(0, 0) } -func testArgs(t *testing.T) Arguments { - httpPort := getFreePort(t) - grpPort := getFreePort(t, httpPort) - return testArgsWithPorts(httpPort, grpPort) +func testArgsWith(mutator func(arguments *Arguments)) Arguments { + a := testArgsWithPorts(0, 0) + mutator(&a) + return a } func testArgsWithPorts(httpPort int, grpcPort int) Arguments { @@ -698,17 +685,3 @@ func testArgsWithPorts(httpPort int, grpcPort int) Arguments { MaxSendMessageSize: 100 * units.MiB, } } - -func getFreePort(t *testing.T, exclude ...int) int { - const maxRetries = 10 - for range maxRetries { - port, err := freeport.GetFreePort() - require.NoError(t, err) - if !slices.Contains(exclude, port) { - return port - } - } - - t.Fatal("fail to get free port") - return 0 -} diff --git a/internal/component/loki/source/api/internal/lokipush/push_api_server.go b/internal/component/loki/source/api/internal/lokipush/push_api_server.go index 122d17000a6..34888fffb33 100644 --- a/internal/component/loki/source/api/internal/lokipush/push_api_server.go +++ b/internal/component/loki/source/api/internal/lokipush/push_api_server.go @@ -119,6 +119,14 @@ func (s *PushAPIServer) ServerConfig() fnet.ServerConfig { return *s.serverConfig } +func (s *PushAPIServer) HTTPListenAddress() string { + return s.server.HTTPListenAddr() +} + +func (s *PushAPIServer) GRPCListenAddress() string { + return s.server.GRPCListenAddr() +} + func (s *PushAPIServer) Shutdown() { level.Info(s.logger).Log("msg", "stopping push API server") // StopAndShutdown tries to gracefully shutdown.