Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions manager/pkg/cronjob/task/compliance_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@ var (
func SyncLocalCompliance(ctx context.Context, pool *pgxpool.Pool, enableSimulation bool, job gocron.Job) {
startTime = time.Now()

historyDate := startTime.AddDate(0, 0, -dateInterval)
log = ctrl.Log.WithName(localComplianceTaskName).WithValues("history", historyDate.Format(dateFormat))
log.Info("start running", "currentRun", job.LastRun().Format(timeFormat))

interval := dateInterval
if enableSimulation {
interval = simulationCounter
}

historyDate := startTime.AddDate(0, 0, -interval)
log = ctrl.Log.WithName(localComplianceTaskName).WithValues("history", historyDate.Format(dateFormat))
log.Info("start running", "currentRun", job.LastRun().Format(timeFormat))

// insert or update with local_status.compliance
statusTotal, statusInsert, err := syncToLocalComplianceHistoryByLocalStatus(ctx, pool, batchSize, interval)
if err != nil {
log.Error(err, "sync from local_status.compliance to history.local_compliance failed")
return
}
log.Info("with local_status.compliance", "totalCount", statusTotal, "insertedCount", statusInsert)

Expand All @@ -59,6 +60,7 @@ func SyncLocalCompliance(ctx context.Context, pool *pgxpool.Pool, enableSimulati
eventTotal, eventInsert, err := syncToLocalComplianceHistoryByPolicyEvent(ctx, pool, batchSize)
if err != nil {
log.Error(err, "sync from history.local_policies to history.local_compliance failed")
return
}
log.Info("with event.local_policies", "totalCount", eventTotal, "insertedCount", eventInsert)

Expand All @@ -68,12 +70,14 @@ func SyncLocalCompliance(ctx context.Context, pool *pgxpool.Pool, enableSimulati
func syncToLocalComplianceHistoryByLocalStatus(ctx context.Context, pool *pgxpool.Pool, batchSize int64, interval int) (
totalCount int64, insertedCount int64, err error,
) {
viewName := fmt.Sprintf("history.local_compliance_view_%s",
startTime.AddDate(0, 0, -interval).Format("2006_01_02"))
viewSchema := "history"
viewTable := fmt.Sprintf("local_compliance_view_%s", startTime.AddDate(0, 0, -interval).Format("2006_01_02"))
viewName := fmt.Sprintf("%s.%s", viewSchema, viewTable)
createViewTemplate := `
CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS
SELECT policy_id,cluster_id,leaf_hub_name,compliance
FROM local_status.compliance;
CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS
SELECT policy_id,cluster_id,leaf_hub_name,compliance
FROM local_status.compliance
WITH DATA;
CREATE INDEX IF NOT EXISTS idx_local_compliance_view ON %s (policy_id, cluster_id);
`
_, err = pool.Exec(ctx, fmt.Sprintf(createViewTemplate, viewName, viewName))
Expand Down Expand Up @@ -123,10 +127,12 @@ func insertToLocalComplianceHistoryByLocalStatus(ctx context.Context, tableName
err = wait.PollUntilWithContext(timeoutCtx, 5*time.Second, func(ctx context.Context) (done bool, err error) {
selectInsertSQLTemplate := `
INSERT INTO history.local_compliance (policy_id, cluster_id, leaf_hub_name, compliance, compliance_date)
SELECT policy_id,cluster_id,leaf_hub_name,compliance,(CURRENT_DATE - INTERVAL '%d day')
FROM %s
ORDER BY policy_id, cluster_id
LIMIT $1 OFFSET $2
(
SELECT policy_id,cluster_id,leaf_hub_name,compliance,(CURRENT_DATE - INTERVAL '%d day')
FROM %s
ORDER BY policy_id, cluster_id
LIMIT $1 OFFSET $2
)
ON CONFLICT (policy_id, cluster_id, compliance_date) DO NOTHING
`
selectInsertSQL := fmt.Sprintf(selectInsertSQLTemplate, interval, tableName)
Expand Down