Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 5 additions & 33 deletions cli/cli/commands/grafloki/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package start

import (
"context"
"fmt"
"github.com/kurtosis-tech/kurtosis/cli/cli/command_framework/lowlevel"
"github.com/kurtosis-tech/kurtosis/cli/cli/command_framework/lowlevel/args"
"github.com/kurtosis-tech/kurtosis/cli/cli/command_framework/lowlevel/flags"
Expand All @@ -11,8 +10,6 @@ import (
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/engine_manager"
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/grafloki"
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/kurtosis_config_getter"
"github.com/kurtosis-tech/kurtosis/cli/cli/kurtosis_config/resolved_config"
"github.com/kurtosis-tech/kurtosis/cli/cli/out"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/logs_aggregator"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -44,6 +41,7 @@ func run(
if err != nil {
return stacktrace.Propagate(err, "An error occurred getting Kurtosis cluster config.")
}
clusterConfig.GetClusterType()

// NOTE(tedi 04/03/25): If you're wondering why the grafana / loki instance is being started by the CLI (and not in container-engine-lib via KurtosisBackend as with LogsCollector and LogsAggregator), here's why:
// 1. now that Kurtosis is purely OSS, it's important to reduce maintenance surface / complexity inside Kurtosis core (Engine, APIContainer, KurtosisBackend, Starlark Engine)
Expand All @@ -54,44 +52,18 @@ func run(
// putting it in the CLI is saying - “You could set up Grafana and Loki yourself, and then restart the engine to point to it, Kurtosis CLI will do that for you to save you a step”
// putting it in Kurtosis core is saying - “Grafana and Loki are core a necessary part of the Kurtosis platform and supports the Kurtosis abstraction/value prop" - which is not the case
// https://drawpaintacademy.com/the-bull/
var lokiHost string
var grafanaUrl string
switch clusterConfig.GetClusterType() {
case resolved_config.KurtosisClusterType_Docker:
lokiHost, grafanaUrl, err = grafloki.StartGrafLokiInDocker(ctx)
if err != nil {
return stacktrace.Propagate(err, "An error occurred starting Grafana and Loki in Docker.")
}
case resolved_config.KurtosisClusterType_Kubernetes:
lokiHost, grafanaUrl, err = grafloki.StartGrafLokiInKubernetes(ctx)
if err != nil {
return stacktrace.Propagate(err, "An error occurred starting Grafana and Loki in Kubernetes.")
}
default:
return stacktrace.NewError("Unsupported cluster type: %v", clusterConfig.GetClusterType().String())
}

// This matches the exact configurations here: https://vector.dev/docs/reference/configuration/sinks/loki/
lokiSink := map[string]map[string]interface{}{
"loki": {
"type": "loki",
"endpoint": lokiHost,
"encoding": map[string]string{
"codec": "json",
},
"labels": map[string]string{
"job": "kurtosis",
},
},
lokiSink, _, err := grafloki.StartGrafloki(ctx, clusterConfig.GetClusterType(), clusterConfig.GetGraflokiConfig())
if err != nil {
return err // already wrapped
}
//logrus.Infof("Grafana running at %v", grafanaUrl)

logrus.Infof("Configuring engine to send logs to Loki...")
err = restartEngineWithLogsSink(ctx, lokiSink)
if err != nil {
return stacktrace.Propagate(err, "An error occurred restarting engine to be configured to send logs to Loki.")
}

out.PrintOutLn(fmt.Sprintf("Grafana running at %v", grafanaUrl))
return nil
}

Expand Down
18 changes: 2 additions & 16 deletions cli/cli/commands/grafloki/stop/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/kurtosis-tech/kurtosis/cli/cli/command_str_consts"
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/grafloki"
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/kurtosis_config_getter"
"github.com/kurtosis-tech/kurtosis/cli/cli/kurtosis_config/resolved_config"
"github.com/kurtosis-tech/kurtosis/cli/cli/out"
"github.com/kurtosis-tech/stacktrace"
)

Expand All @@ -34,21 +32,9 @@ func run(
return stacktrace.Propagate(err, "An error occurred getting Kurtosis cluster config.")
}

switch clusterConfig.GetClusterType() {
case resolved_config.KurtosisClusterType_Docker:
err := grafloki.StopGrafLokiInDocker(ctx)
if err != nil {
return stacktrace.Propagate(err, "An error occurred stopping Grafana and Loki containers in Docker.")
}
case resolved_config.KurtosisClusterType_Kubernetes:
err := grafloki.StopGrafLokiInKubernetes(ctx)
if err != nil {
return stacktrace.Propagate(err, "An error occurred stopping Grafana and Loki containers in Kubernetes.")
}
default:
return stacktrace.NewError("Unsupported cluster type: %v", clusterConfig.GetClusterType().String())
if err = grafloki.StopGrafloki(ctx, clusterConfig.GetClusterType()); err != nil {
return err // already wrapped
}

out.PrintOutLn("Successfully stopped Grafana and Loki containers.")
Comment thread
tedim52 marked this conversation as resolved.
return nil
}
9 changes: 9 additions & 0 deletions cli/cli/helpers/engine_manager/engine_existence_guarantor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type engineExistenceGuarantor struct {

// Destinations the logs aggregator will deliver to
sinks logs_aggregator.Sinks

// If set to true, engine will not store logs in a persistent volume
shouldEnablePersistentVolumeLogsCollection bool
}

func newEngineExistenceGuarantorWithDefaultVersion(
Expand All @@ -116,6 +119,7 @@ func newEngineExistenceGuarantorWithDefaultVersion(
domain string,
logRetentionPeriod string,
sinks logs_aggregator.Sinks,
shouldEnablePersistentVolumeLogsCollection bool,
) *engineExistenceGuarantor {
return newEngineExistenceGuarantorWithCustomVersion(
ctx,
Expand All @@ -137,6 +141,7 @@ func newEngineExistenceGuarantorWithDefaultVersion(
domain,
logRetentionPeriod,
sinks,
shouldEnablePersistentVolumeLogsCollection,
)
}

Expand All @@ -160,6 +165,7 @@ func newEngineExistenceGuarantorWithCustomVersion(
domain string,
logRetentionPeriod string,
sinks logs_aggregator.Sinks,
shouldEnablePersistentVolumeLogsCollection bool,
) *engineExistenceGuarantor {
return &engineExistenceGuarantor{
ctx: ctx,
Expand All @@ -183,6 +189,7 @@ func newEngineExistenceGuarantorWithCustomVersion(
domain: domain,
logRetentionPeriod: logRetentionPeriod,
sinks: sinks,
shouldEnablePersistentVolumeLogsCollection: shouldEnablePersistentVolumeLogsCollection,
}
}

Expand Down Expand Up @@ -245,6 +252,7 @@ func (guarantor *engineExistenceGuarantor) VisitStopped() error {
guarantor.domain,
guarantor.logRetentionPeriod,
guarantor.sinks,
guarantor.shouldEnablePersistentVolumeLogsCollection,
)
} else {
_, _, engineLaunchErr = guarantor.engineServerLauncher.LaunchWithCustomVersion(
Expand All @@ -268,6 +276,7 @@ func (guarantor *engineExistenceGuarantor) VisitStopped() error {
guarantor.domain,
guarantor.logRetentionPeriod,
guarantor.sinks,
guarantor.shouldEnablePersistentVolumeLogsCollection,
)
}
if engineLaunchErr != nil {
Expand Down
44 changes: 37 additions & 7 deletions cli/cli/helpers/engine_manager/engine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine_manager

import (
"context"
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/grafloki"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/logs_aggregator"
"strings"
"time"
Expand Down Expand Up @@ -197,6 +198,19 @@ func (manager *EngineManager) StartEngineIdempotentlyWithDefaultVersion(
logrus.Debugf("Engine status: '%v'", status)
clusterType := manager.clusterConfig.GetClusterType()

var lokiSink logs_aggregator.Sinks
var grafanaUrl string
if manager.clusterConfig.GetGraflokiConfig().ShouldStartBeforeEngine {
lokiSink, grafanaUrl, err = grafloki.StartGrafloki(ctx, clusterType, manager.clusterConfig.GetGraflokiConfig())
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred starting Grafana and Loki before engine.")
}
}
if grafanaUrl != "" {
logrus.Infof("Grafana running at %v", grafanaUrl)
}
additionalSinks = combineSinks(additionalSinks, lokiSink)

engineGuarantor := newEngineExistenceGuarantorWithDefaultVersion(
ctx,
maybeHostMachinePortBinding,
Expand All @@ -215,7 +229,8 @@ func (manager *EngineManager) StartEngineIdempotentlyWithDefaultVersion(
restartAPIContainers,
domain,
logRetentionPeriodStr,
combineAdditionalSinksAndConfigSinks(additionalSinks, manager.clusterConfig.GetLogsAggregatorConfig().Sinks),
combineSinks(additionalSinks, manager.clusterConfig.GetLogsAggregatorConfig().Sinks),
manager.clusterConfig.ShouldEnableDefaultLogsSink(),
)
// TODO Need to handle the Kubernetes case, where a gateway needs to be started after the engine is started but
// before we can return an EngineClient
Expand Down Expand Up @@ -247,6 +262,20 @@ func (manager *EngineManager) StartEngineIdempotentlyWithCustomVersion(ctx conte
}

clusterType := manager.clusterConfig.GetClusterType()

var lokiSink logs_aggregator.Sinks
var grafanaUrl string
if manager.clusterConfig.GetGraflokiConfig().ShouldStartBeforeEngine {
Comment thread
tedim52 marked this conversation as resolved.
lokiSink, grafanaUrl, err = grafloki.StartGrafloki(ctx, clusterType, manager.clusterConfig.GetGraflokiConfig())
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred starting Grafana and Loki before engine.")
}
}
if grafanaUrl != "" {
logrus.Infof("Grafana running at %v", grafanaUrl)
}
additionalSinks = combineSinks(additionalSinks, lokiSink)

engineGuarantor := newEngineExistenceGuarantorWithCustomVersion(
ctx,
maybeHostMachinePortBinding,
Expand All @@ -266,7 +295,8 @@ func (manager *EngineManager) StartEngineIdempotentlyWithCustomVersion(ctx conte
restartAPIContainers,
domain,
logRetentionPeriodStr,
combineAdditionalSinksAndConfigSinks(additionalSinks, manager.clusterConfig.GetLogsAggregatorConfig().Sinks),
combineSinks(manager.clusterConfig.GetLogsAggregatorConfig().Sinks, additionalSinks),
manager.clusterConfig.ShouldEnableDefaultLogsSink(),
)
engineClient, engineClientCloseFunc, err := manager.startEngineWithGuarantor(ctx, status, engineGuarantor)
if err != nil {
Expand Down Expand Up @@ -528,14 +558,14 @@ func (manager *EngineManager) waitUntilEngineStoppedOrError(ctx context.Context)
return stacktrace.NewError("Engine did not report stopped status, last status reported was '%v'", status)
}

// combineAdditionalSinksAndConfigSinks will combine additionalSinks and configSinks
// note: additionalSinks will override configSinks in case of an id clash
func combineAdditionalSinksAndConfigSinks(additionalSinks logs_aggregator.Sinks, configSinks logs_aggregator.Sinks) logs_aggregator.Sinks {
// combineSinks aggregates sinks and sinksToAdd
// note: sinksToAdd will override sinks in case of an id clash
func combineSinks(sinks logs_aggregator.Sinks, sinksToAdd logs_aggregator.Sinks) logs_aggregator.Sinks {
combinedSinks := logs_aggregator.Sinks{}
for sinkId, sink := range configSinks {
for sinkId, sink := range sinks {
combinedSinks[sinkId] = sink
}
for sinkId, sink := range additionalSinks {
for sinkId, sink := range sinksToAdd {
combinedSinks[sinkId] = sink
}
return combinedSinks
Expand Down
Loading
Loading