Skip to content

Commit

Permalink
Merge branch 'feature/#5722/datastorage' into 'develop'
Browse files Browse the repository at this point in the history
feature(#5722): Merge cleaning workers to one worker

See merge request canopsis/canopsis-pro!4618
  • Loading branch information
mmourcia committed Mar 7, 2025
2 parents a86b908 + e0f9495 commit 8182230
Show file tree
Hide file tree
Showing 41 changed files with 2,247 additions and 1,176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ TimeToExecute = "Sunday,23"
MaxUpdates = 100000
# MongoClientTimeout specifies the amount of time that a single operation run on Mongo Client can execute before returning an error.
MongoClientTimeout = "1m"
# Timeout specifies the amount of time that a whole cleaning process can execute before returning an error.
Timeout = "1h"

# It's required to restart api and engines after canopsis-reconfigure updates mongodb
# to catch up following parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ TimeToExecute = "Sunday,23"
MaxUpdates = 100000
# MongoClientTimeout specifies the amount of time that a single operation run on Mongo Client can execute before returning an error.
MongoClientTimeout = "1m"
# Timeout specifies the amount of time that a whole cleaning process can execute before returning an error.
Timeout = "1h"

# It's required to restart api and engines after canopsis-reconfigure updates mongodb
# to catch up following parameters.
Expand Down
28 changes: 1 addition & 27 deletions community/go-engines-community/cmd/engine-fifo/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,16 @@ package main
import (
"context"

"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/config"
libengine "git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/engine"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/eventfilter"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/metrics"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/depmake"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/fifo"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/postgres"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/utils"
"github.com/rs/zerolog"
)

func NewEngine(ctx context.Context, options fifo.Options, logger zerolog.Logger) libengine.Engine {
defer depmake.Catch(logger)

var m depmake.DependencyMaker
dbClient := m.DepMongoClient(ctx, logger)
cfg := m.DepConfig(ctx, dbClient)
config.SetDbClientRetry(dbClient, cfg)
eventFilterEventCounter := eventfilter.NewEventCounter(dbClient,
utils.MinDuration(canopsis.DefaultFlushInterval, options.PeriodicalWaitTime), logger)
eventFilterFailureService := eventfilter.NewFailureService(dbClient,
utils.MinDuration(canopsis.DefaultFlushInterval, options.PeriodicalWaitTime), logger)
pgPoolProvider := postgres.NewPoolProvider(cfg.Global.ReconnectRetries, cfg.Global.GetReconnectTimeout())
metricsConfigProvider := config.NewMetricsConfigProvider(cfg, logger)
metricsSender := metrics.NewTimescaleDBSender(pgPoolProvider, metricsConfigProvider, logger)
engine := fifo.Default(ctx, options, dbClient, cfg, eventfilter.NewExternalDataGetterContainer(),
config.NewTimezoneConfigProvider(cfg, logger), config.NewTemplateConfigProvider(cfg, logger), metricsConfigProvider,
eventFilterEventCounter, eventFilterFailureService, metricsSender, logger)
engine.AddDeferFunc(func(ctx context.Context) {
pgPoolProvider.Close()
})
engine.AddRoutine(func(ctx context.Context) error {
metricsSender.Run(ctx)
return nil
})
engine, _ := fifo.Default(ctx, options, logger)

return engine
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/alarm"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/config"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/datastorage"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/encoding/json"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/engine"
"git.canopsis.net/canopsis/canopsis-community/community/go-engines-community/lib/canopsis/entity"
Expand Down Expand Up @@ -46,7 +45,6 @@ func NewEnginePBehavior(ctx context.Context, options Options, logger zerolog.Log
cfg := m.DepConfig(ctx, dbClient)
config.SetDbClientRetry(dbClient, cfg)
timezoneConfigProvider := config.NewTimezoneConfigProvider(cfg, logger)
dataStorageConfigProvider := config.NewDataStorageConfigProvider(cfg, logger)
amqpConnection := m.DepAmqpConnection(logger, cfg)
amqpChannel := m.DepAMQPChannelPub(amqpConnection)
pbhRedisSession := m.DepRedisSession(ctx, redis.PBehaviorLockStorage, logger, cfg)
Expand Down Expand Up @@ -102,7 +100,7 @@ func NewEnginePBehavior(ctx context.Context, options Options, logger zerolog.Log
return nil
},
func(ctx context.Context) {
err := dbClient.Disconnect(ctx)
err := dbClient.Disconnect(context.WithoutCancel(ctx))
if err != nil {
logger.Error().Err(err).Msg("failed to close mongo connection")
}
Expand Down Expand Up @@ -202,24 +200,11 @@ func NewEnginePBehavior(ctx context.Context, options Options, logger zerolog.Log
},
logger,
))
enginePbehavior.AddPeriodicalWorker("cleaner", engine.NewLockedPeriodicalWorker(
redis.NewLockClient(lockRedisSession),
redis.PbehaviorCleanPeriodicalLockKey,
&cleanPeriodicalWorker{
PeriodicalInterval: time.Hour,
TimezoneConfigProvider: timezoneConfigProvider,
DataStorageConfigProvider: dataStorageConfigProvider,
LimitConfigAdapter: datastorage.NewAdapter(dbClient),
Logger: logger,
},
logger,
))
enginePbehavior.AddPeriodicalWorker("config", engine.NewLoadConfigPeriodicalWorker(
options.PeriodicalWaitTime,
config.NewAdapter(dbClient),
logger,
timezoneConfigProvider,
dataStorageConfigProvider,
techMetricsConfigProvider,
))
enginePbehavior.AddPeriodicalWorker("rrule_cstart", computeRruleStartWorker)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
db.configuration.find({_id: "data_storage"}).forEach(function (doc) {
let set = {};
if (doc.history) {
const keys = [
"junit",
"remediation",
"pbehavior",
"health_check",
"webhook",
"event_filter_failure",
"event_records",
];
for (const key of keys) {
const v = doc.history[key];
if (v) {
set["history." + key] = v.time;
}
}
}

let update = {
$unset: {
"config.event_records.enabled": "",
}
};
if (Object.keys(set).length > 0) {
update["$set"] = set;
}

db.configuration.updateOne({_id: "data_storage"}, update);
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
db.configuration.find({_id: "data_storage"}).forEach(function (doc) {
let update = {};
if (doc.history) {
const keys = Object.keys(doc.history)
for (const key of keys) {
const v = doc.history[key];
if (isInt(v)) {
update["history." + key] = {
time: v,
};
}
}
}

if (doc.config && doc.config.event_records && doc.config.event_records.enabled === undefined) {
update["config.event_records.enabled"] = true;
}

if (Object.keys(update).length > 0) {
db.configuration.updateOne({_id: "data_storage"}, {$set: update});
}
});
6 changes: 3 additions & 3 deletions community/go-engines-community/lib/api/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func Default(
authorProvider, tplExecutor, json.NewDecoder(), logger)
alarmWatcher := alarmapi.NewWatcher(dbClient, websocketHub, alarmStore, json.NewEncoder(), json.NewDecoder(), logger)

messageRateWatcher := messageratestats.NewWatcher(websocketHub, messageratestats.NewStore(dbClient, pgPoolProvider),
messageRateWatcher := messageratestats.NewWatcher(websocketHub, messageratestats.NewStore(pgPoolProvider),
json.NewEncoder(), json.NewDecoder(), flags.IntegrationPeriodicalWaitTime, logger)

err = registerWebsocketGroups(websocketHub, alarmWatcher, messageRateWatcher)
Expand Down Expand Up @@ -270,11 +270,11 @@ func Default(
close(entityCleanerTaskChan)
close(broadcastMessageChan)

err := dbClient.Disconnect(ctx)
err := dbClient.Disconnect(context.WithoutCancel(ctx))
if err != nil {
logger.Error().Err(err).Msg("failed to close mongo connection")
}
err = dbExportClient.Disconnect(ctx)
err = dbExportClient.Disconnect(context.WithoutCancel(ctx))
if err != nil {
logger.Error().Err(err).Msg("failed to close mongo connection")
}
Expand Down
33 changes: 4 additions & 29 deletions community/go-engines-community/lib/api/docs/schemas_swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ definitions:
event_records:
properties:
delete_after:
$ref: '#/definitions/datetime.DurationWithUnit'
$ref: '#/definitions/datetime.DurationWithEnabled'
type: object
health_check:
properties:
Expand Down Expand Up @@ -1539,34 +1539,9 @@ definitions:
config:
$ref: '#/definitions/datastorage.Config'
history:
$ref: '#/definitions/datastorage.History'
type: object
datastorage.History:
properties:
alarm:
$ref: '#/definitions/datastorage.HistoryWithCount'
alarm_external_tag:
$ref: '#/definitions/datastorage.HistoryWithCount'
entity_cleaned:
$ref: '#/definitions/datastorage.HistoryWithCount'
entity_disabled:
$ref: '#/definitions/datastorage.HistoryWithCount'
entity_unlinked:
$ref: '#/definitions/datastorage.HistoryWithCount'
event_filter_failure:
type: integer
event_records:
type: integer
health_check:
type: integer
junit:
type: integer
pbehavior:
type: integer
remediation:
type: integer
webhook:
type: integer
additionalProperties:
$ref: '#/definitions/datastorage.HistoryWithCount'
type: object
type: object
datastorage.HistoryWithCount:
properties:
Expand Down
Loading

0 comments on commit 8182230

Please sign in to comment.