diff --git a/manager/pkg/cronjob/task/compliance_history.go b/manager/pkg/cronjob/task/compliance_history.go index d15c42863b..85c9ee7b45 100644 --- a/manager/pkg/cronjob/task/compliance_history.go +++ b/manager/pkg/cronjob/task/compliance_history.go @@ -34,19 +34,20 @@ var ( func SyncLocalCompliance(ctx context.Context, pool *pgxpool.Pool, enableSimulation bool, job gocron.Job) { startTime = time.Now() - historyDate := startTime.AddDate(0, 0, -dateInterval) - log = ctrl.Log.WithName(localComplianceTaskName).WithValues("history", historyDate.Format(dateFormat)) - log.Info("start running", "currentRun", job.LastRun().Format(timeFormat)) - interval := dateInterval if enableSimulation { interval = simulationCounter } + historyDate := startTime.AddDate(0, 0, -interval) + log = ctrl.Log.WithName(localComplianceTaskName).WithValues("history", historyDate.Format(dateFormat)) + log.Info("start running", "currentRun", job.LastRun().Format(timeFormat)) + // insert or update with local_status.compliance statusTotal, statusInsert, err := syncToLocalComplianceHistoryByLocalStatus(ctx, pool, batchSize, interval) if err != nil { log.Error(err, "sync from local_status.compliance to history.local_compliance failed") + return } log.Info("with local_status.compliance", "totalCount", statusTotal, "insertedCount", statusInsert) @@ -59,6 +60,7 @@ func SyncLocalCompliance(ctx context.Context, pool *pgxpool.Pool, enableSimulati eventTotal, eventInsert, err := syncToLocalComplianceHistoryByPolicyEvent(ctx, pool, batchSize) if err != nil { log.Error(err, "sync from history.local_policies to history.local_compliance failed") + return } log.Info("with event.local_policies", "totalCount", eventTotal, "insertedCount", eventInsert) @@ -68,12 +70,14 @@ func SyncLocalCompliance(ctx context.Context, pool *pgxpool.Pool, enableSimulati func syncToLocalComplianceHistoryByLocalStatus(ctx context.Context, pool *pgxpool.Pool, batchSize int64, interval int) ( totalCount int64, insertedCount int64, err error, ) { - viewName := fmt.Sprintf("history.local_compliance_view_%s", - startTime.AddDate(0, 0, -interval).Format("2006_01_02")) + viewSchema := "history" + viewTable := fmt.Sprintf("local_compliance_view_%s", startTime.AddDate(0, 0, -interval).Format("2006_01_02")) + viewName := fmt.Sprintf("%s.%s", viewSchema, viewTable) createViewTemplate := ` - CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS - SELECT policy_id,cluster_id,leaf_hub_name,compliance - FROM local_status.compliance; + CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS + SELECT policy_id,cluster_id,leaf_hub_name,compliance + FROM local_status.compliance + WITH DATA; CREATE INDEX IF NOT EXISTS idx_local_compliance_view ON %s (policy_id, cluster_id); ` _, err = pool.Exec(ctx, fmt.Sprintf(createViewTemplate, viewName, viewName)) @@ -123,10 +127,12 @@ func insertToLocalComplianceHistoryByLocalStatus(ctx context.Context, tableName err = wait.PollUntilWithContext(timeoutCtx, 5*time.Second, func(ctx context.Context) (done bool, err error) { selectInsertSQLTemplate := ` INSERT INTO history.local_compliance (policy_id, cluster_id, leaf_hub_name, compliance, compliance_date) - SELECT policy_id,cluster_id,leaf_hub_name,compliance,(CURRENT_DATE - INTERVAL '%d day') - FROM %s - ORDER BY policy_id, cluster_id - LIMIT $1 OFFSET $2 + ( + SELECT policy_id,cluster_id,leaf_hub_name,compliance,(CURRENT_DATE - INTERVAL '%d day') + FROM %s + ORDER BY policy_id, cluster_id + LIMIT $1 OFFSET $2 + ) ON CONFLICT (policy_id, cluster_id, compliance_date) DO NOTHING ` selectInsertSQL := fmt.Sprintf(selectInsertSQLTemplate, interval, tableName)