diff --git a/.gitignore b/.gitignore index a847e74fa..04ac321d7 100644 --- a/.gitignore +++ b/.gitignore @@ -38,4 +38,4 @@ dist .pnp.* -tsconfig.tsbuildinfo \ No newline at end of file +tsconfig.tsbuildinfo diff --git a/e2e-test/application_sync_test.go b/e2e-test/application_sync_test.go index 3ac382af8..21ef2e989 100644 --- a/e2e-test/application_sync_test.go +++ b/e2e-test/application_sync_test.go @@ -114,9 +114,7 @@ var _ = Describe("Test the application synchronizing", func() { Expect(lrr.Records[1].Name).Should(Equal("test-v2")) // The workflow includes a suspend step. - if lrr.Records[1].Status != "suspending" { - return fmt.Errorf("the record status is %s, not suspending", lrr.Records[1].Status) - } + Expect(lrr.Records[1].Status).Should(Equal("suspending")) return nil }).WithTimeout(time.Minute * 1).WithPolling(3 * time.Second).Should(BeNil()) }) diff --git a/pkg/server/domain/service/workflow.go b/pkg/server/domain/service/workflow.go index 162e8afb9..9377c1c5f 100644 --- a/pkg/server/domain/service/workflow.go +++ b/pkg/server/domain/service/workflow.go @@ -24,8 +24,8 @@ import ( "io" "strconv" "strings" + "time" - "helm.sh/helm/v3/pkg/time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -77,7 +77,7 @@ type WorkflowService interface { CreateWorkflowRecord(ctx context.Context, appModel *model.Application, app *v1beta1.Application, workflow *model.Workflow) (*model.WorkflowRecord, error) ListWorkflowRecords(ctx context.Context, workflow *model.Workflow, page, pageSize int) (*apisv1.ListWorkflowRecordsResponse, error) DetailWorkflowRecord(ctx context.Context, workflow *model.Workflow, recordName string) (*apisv1.DetailWorkflowRecordResponse, error) - SyncWorkflowRecord(ctx context.Context) error + SyncWorkflowRecord(ctx context.Context, appKey, recordName string, app *v1beta1.Application, workflowContext map[string]string) error ResumeRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, stepName string) error TerminateRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName string) error RollbackRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, revisionName string) (*apisv1.WorkflowRecordBase, error) @@ -353,104 +353,131 @@ func (w *workflowServiceImpl) DetailWorkflowRecord(ctx context.Context, workflow }, nil } -func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context) error { - var record = model.WorkflowRecord{ - Finished: "false", +// nolint:gocyclo +func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context, appPrimaryKey, recordName string, app *v1beta1.Application, workflowContext map[string]string) error { + if app == nil || app.Annotations == nil || app.Status.Workflow == nil { + return nil } - // list all unfinished workflow records - records, err := w.Store.List(ctx, &record, &datastore.ListOptions{}) - if err != nil { - return err + var record = &model.WorkflowRecord{ + AppPrimaryKey: appPrimaryKey, + Name: recordName, } - - for _, item := range records { - app := &v1beta1.Application{} - record := item.(*model.WorkflowRecord) - workflow := &model.Workflow{ - Name: record.WorkflowName, - AppPrimaryKey: record.AppPrimaryKey, + if err := w.Store.Get(ctx, record); err != nil { + if errors.Is(err, datastore.ErrRecordNotExist) { + return bcode.ErrWorkflowRecordNotExist } - if err := w.Store.Get(ctx, workflow); err != nil { - klog.ErrorS(err, "failed to get workflow", "app name", record.AppPrimaryKey, "workflow name", record.WorkflowName, "record name", record.Name) - continue + return err + } + var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey} + if err := w.Store.Get(ctx, revision); err != nil { + if errors.Is(err, datastore.ErrRecordNotExist) { + return bcode.ErrApplicationRevisionNotExist } - envbinding, err := w.EnvBindingService.GetEnvBinding(ctx, &model.Application{Name: record.AppPrimaryKey}, workflow.EnvName) - if err != nil { - klog.ErrorS(err, "failed to get envbinding", "app name", record.AppPrimaryKey, "workflow name", record.WorkflowName, "record name", record.Name) + return err + } + + if workflowContext != nil { + record.ContextValue = workflowContext + } + + status := app.Status.Workflow + record.Status = string(status.Phase) + record.Message = status.Message + record.Mode = status.Mode + + if cb := app.Status.Workflow.ContextBackend; cb != nil && workflowContext == nil && cb.Namespace != "" && cb.Name != "" { + var cm corev1.ConfigMap + if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: cb.Namespace, Name: cb.Name}, &cm); err != nil { + klog.Errorf("failed to load the context values of the application %s:%s", app.Name, err.Error()) } - var appName string - if envbinding != nil { - appName = envbinding.AppDeployName + record.ContextValue = cm.Data + } + + stepStatus := make(map[string]*model.WorkflowStepStatus, len(status.Steps)) + stepAlias := make(map[string]string) + for _, step := range record.Steps { + stepAlias[step.Name] = step.Alias + for _, sub := range step.SubStepsStatus { + stepAlias[sub.Name] = sub.Alias } - if appName == "" { - appName = record.AppPrimaryKey + } + for _, step := range status.Steps { + stepStatus[step.Name] = &model.WorkflowStepStatus{ + StepStatus: convert.FromCRWorkflowStepStatus(step.StepStatus, stepAlias[step.Name]), + SubStepsStatus: make([]model.StepStatus, 0), } - if err := w.KubeClient.Get(ctx, types.NamespacedName{ - Name: appName, - Namespace: record.Namespace, - }, app); err != nil { - if apierrors.IsNotFound(err) { - klog.Warningf("can't find the application %s/%s, set the record status to terminated", appName, record.Namespace) - if err := w.setRecordToTerminated(ctx, record.AppPrimaryKey, record.Name); err != nil { - klog.Errorf("failed to set the record status to terminated %s", err.Error()) - } - continue - } - klog.ErrorS(err, "failed to get app", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) - continue + for _, sub := range step.SubStepsStatus { + stepStatus[step.Name].SubStepsStatus = append(stepStatus[step.Name].SubStepsStatus, convert.FromCRWorkflowStepStatus(sub, stepAlias[sub.Name])) } - - if app.Status.Workflow == nil { - continue + } + for i, step := range record.Steps { + if stepStatus[step.Name] != nil { + record.Steps[i] = *stepStatus[step.Name] + } else { + record.Steps[i].Phase = "" + record.Steps[i].Message = "" + record.Steps[i].Reason = "" + record.Steps[i].FirstExecuteTime = time.Time{} + record.Steps[i].LastExecuteTime = time.Time{} } + } - // This means the application workflow has not run. - if app.Generation > app.Status.ObservedGeneration { - continue + // the auto generated workflow steps should be sync + if (len(record.Steps) == 0) && len(status.Steps) > 0 { + for k := range stepStatus { + record.Steps = append(record.Steps, *stepStatus[k]) } + } - // there is a ":" in the default app revision - recordName := strings.Replace(app.Status.Workflow.AppRevision, ":", "-", 1) + record.Finished = strconv.FormatBool(status.Finished) + record.EndTime = status.EndTime.Time + if err := w.Store.Put(ctx, record); err != nil { + return err + } - // try to sync the status from the running application - if app.Annotations != nil && app.Status.Workflow != nil && recordName == record.Name { - if err := w.syncWorkflowStatus(ctx, record.AppPrimaryKey, app, record.Name, app.Name, nil); err != nil { - klog.ErrorS(err, "failed to sync workflow status", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) - } - } + revision.Status = generateRevisionStatus(status.Phase) + if app.Status.LatestRevision != nil { + revision.RevisionCRName = app.Status.LatestRevision.Name + } + if err := w.Store.Put(ctx, revision); err != nil { + return err + } - if record.Name == oam.GetPublishVersion(app) { - continue - } + if record.Finished == "true" { + klog.InfoS("successfully sync workflow status", "oam app name", app.Name, "workflow name", record.WorkflowName, "record name", record.Name, "status", record.Status, "sync source", app.Name) + } - // try to sync the status from the application revision + unfinished := &model.WorkflowRecord{ + Finished: "false", + } + // list all unfinished workflow records and sync with revision + unfinishedRecords, err := w.Store.List(ctx, unfinished, &datastore.ListOptions{}) + if err != nil { + return err + } + for _, item := range unfinishedRecords { + record := item.(*model.WorkflowRecord) var revision = &model.ApplicationRevision{AppPrimaryKey: record.AppPrimaryKey, Version: record.RevisionPrimaryKey} if err := w.Store.Get(ctx, revision); err != nil { if errors.Is(err, datastore.ErrRecordNotExist) { // If the application revision is not exist, the record do not need be synced - var record = &model.WorkflowRecord{ - AppPrimaryKey: record.AppPrimaryKey, - Name: recordName, - } - if err := w.Store.Get(ctx, record); err == nil { - record.Finished = "true" - record.Status = model.RevisionStatusFail - err := w.Store.Put(ctx, record) - if err != nil { - klog.Errorf("failed to set the workflow status is failure %s", err.Error()) - } - continue + record.Finished = "true" + record.Status = model.RevisionStatusFail + if err := w.Store.Put(ctx, record); err != nil { + klog.Errorf("failed to set the workflow status is failure %s", err.Error()) } + continue } klog.Errorf("failed to get the application revision from database %s", err.Error()) continue } - var appRevision v1beta1.ApplicationRevision if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: app.Namespace, Name: revision.RevisionCRName}, &appRevision); err != nil { if apierrors.IsNotFound(err) { klog.Warningf("can't find the application revision %s/%s, set the record status to terminated", revision.RevisionCRName, app.Namespace) - if err := w.setRecordToTerminated(ctx, record.AppPrimaryKey, record.Name); err != nil { + record.Finished = "true" + record.Status = model.RevisionStatusTerminated + if err := w.Store.Put(ctx, record); err != nil { klog.Errorf("failed to set the record status to terminated %s", err.Error()) } continue @@ -458,7 +485,6 @@ func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context) error { klog.Warningf("failed to get the application revision %s", err.Error()) continue } - if appRevision.Status.Workflow != nil { appRevision.Spec.Application.Status.Workflow = appRevision.Status.Workflow if !appRevision.Spec.Application.Status.Workflow.Finished { @@ -466,149 +492,11 @@ func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context) error { appRevision.Spec.Application.Status.Workflow.Terminated = true } } - if err := w.syncWorkflowStatus(ctx, - record.AppPrimaryKey, - &appRevision.Spec.Application, - record.Name, - appRevision.Name, - appRevision.Status.WorkflowContext, - ); err != nil { - klog.ErrorS(err, "failed to sync workflow status", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) + if err := w.SyncWorkflowRecord(ctx, record.AppPrimaryKey, record.Name, &appRevision.Spec.Application, appRevision.Status.WorkflowContext); err != nil { + klog.ErrorS(err, "failed to sync workflow status", "oam app name", app.Name, "workflow name", record.WorkflowName, "record name", record.Name) continue } } - - return nil -} - -func (w *workflowServiceImpl) setRecordToTerminated(ctx context.Context, appPrimaryKey, recordName string) error { - var record = &model.WorkflowRecord{ - AppPrimaryKey: appPrimaryKey, - Name: recordName, - } - if err := w.Store.Get(ctx, record); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrWorkflowRecordNotExist - } - return err - } - var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey} - if err := w.Store.Get(ctx, revision); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrApplicationRevisionNotExist - } - return err - } - record.Status = model.RevisionStatusTerminated - record.Finished = "true" - - revision.Status = model.RevisionStatusTerminated - - if err := w.Store.Put(ctx, record); err != nil { - return err - } - - if err := w.Store.Put(ctx, revision); err != nil { - return err - } - return nil -} - -func (w *workflowServiceImpl) syncWorkflowStatus(ctx context.Context, - appPrimaryKey string, - app *v1beta1.Application, - recordName, - source string, - workflowContext map[string]string) error { - var record = &model.WorkflowRecord{ - AppPrimaryKey: appPrimaryKey, - Name: recordName, - } - if err := w.Store.Get(ctx, record); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrWorkflowRecordNotExist - } - return err - } - var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey} - if err := w.Store.Get(ctx, revision); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrApplicationRevisionNotExist - } - return err - } - - if workflowContext != nil { - record.ContextValue = workflowContext - } - - if app.Status.Workflow != nil { - if app.Status.Workflow.AppRevision != record.Name { - klog.Warningf("the app(%s) revision is not match the record(%s), try next time..", app.Name, record.Name) - return nil - } - status := app.Status.Workflow - record.Status = string(status.Phase) - record.Message = status.Message - record.Mode = status.Mode - - if cb := app.Status.Workflow.ContextBackend; cb != nil && workflowContext == nil && cb.Namespace != "" && cb.Name != "" { - var cm corev1.ConfigMap - if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: cb.Namespace, Name: cb.Name}, &cm); err != nil { - klog.Errorf("failed to load the context values of the application %s:%s", app.Name, err.Error()) - } - record.ContextValue = cm.Data - } - - stepStatus := make(map[string]*model.WorkflowStepStatus, len(status.Steps)) - stepAlias := make(map[string]string) - for _, step := range record.Steps { - stepAlias[step.Name] = step.Alias - for _, sub := range step.SubStepsStatus { - stepAlias[sub.Name] = sub.Alias - } - } - for _, step := range status.Steps { - stepStatus[step.Name] = &model.WorkflowStepStatus{ - StepStatus: convert.FromCRWorkflowStepStatus(step.StepStatus, stepAlias[step.Name]), - SubStepsStatus: make([]model.StepStatus, 0), - } - for _, sub := range step.SubStepsStatus { - stepStatus[step.Name].SubStepsStatus = append(stepStatus[step.Name].SubStepsStatus, convert.FromCRWorkflowStepStatus(sub, stepAlias[sub.Name])) - } - } - for i, step := range record.Steps { - if stepStatus[step.Name] != nil { - record.Steps[i] = *stepStatus[step.Name] - } - } - - // the auto generated workflow steps should be sync - if (len(record.Steps) == 0) && len(status.Steps) > 0 { - for k := range stepStatus { - record.Steps = append(record.Steps, *stepStatus[k]) - } - } - - record.Finished = strconv.FormatBool(status.Finished) - record.EndTime = status.EndTime.Time - if err := w.Store.Put(ctx, record); err != nil { - return err - } - - revision.Status = generateRevisionStatus(status.Phase) - if app.Status.LatestRevision != nil { - revision.RevisionCRName = app.Status.LatestRevision.Name - } - if err := w.Store.Put(ctx, revision); err != nil { - return err - } - } - - if record.Finished == "true" { - klog.InfoS("successfully sync workflow status", "oam app name", app.Name, "workflow name", record.WorkflowName, "record name", record.Name, "status", record.Status, "sync source", source) - } - return nil } @@ -637,13 +525,21 @@ func (w *workflowServiceImpl) CreateWorkflowRecord(ctx context.Context, appModel } steps := make([]model.WorkflowStepStatus, len(workflow.Steps)) for i, step := range workflow.Steps { + subStatus := make([]model.StepStatus, 0) + for _, sub := range step.SubSteps { + subStatus = append(subStatus, model.StepStatus{ + Name: sub.Name, + Alias: sub.Alias, + Type: sub.Type, + }) + } steps[i] = model.WorkflowStepStatus{ StepStatus: model.StepStatus{ Name: step.Name, Alias: step.Alias, Type: step.Type, }, - SubStepsStatus: make([]model.StepStatus, 0), + SubStepsStatus: subStatus, } for _, sub := range step.SubSteps { steps[i].SubStepsStatus = append(steps[i].SubStepsStatus, model.StepStatus{ @@ -662,7 +558,7 @@ func (w *workflowServiceImpl) CreateWorkflowRecord(ctx context.Context, appModel Name: app.Annotations[oam.AnnotationPublishVersion], Namespace: app.Namespace, Finished: "false", - StartTime: time.Now().Time, + StartTime: time.Now(), Steps: steps, Status: string(workflowv1alpha1.WorkflowStateInitializing), } @@ -699,6 +595,7 @@ func resetRevisionsAndRecords(ctx context.Context, ds datastore.DataStore, appNa if err := ds.Put(ctx, revision); err != nil { klog.Info("failed to set rest revisions' status to terminate", "app name", appName, "revision version", revision.Version, "error", err) } + klog.Info("set rest revisions' status to terminate", "app name", appName, "revision version", revision.Version) } } @@ -729,6 +626,7 @@ func resetRevisionsAndRecords(ctx context.Context, ds datastore.DataStore, appNa if err := ds.Put(ctx, record); err != nil { klog.Info("failed to set rest records' status to terminate", "app name", appName, "workflow name", record.WorkflowName, "record name", record.Name, "error", err) } + klog.Info("set rest records' status to terminate", "app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) } } @@ -768,7 +666,7 @@ func (w *workflowServiceImpl) ResumeRecord(ctx context.Context, appModel *model. return err } - if err := w.syncWorkflowStatus(ctx, appModel.PrimaryKey(), oamApp, recordName, oamApp.Name, nil); err != nil { + if err := w.SyncWorkflowRecord(ctx, appModel.PrimaryKey(), recordName, oamApp, nil); err != nil { return err } @@ -783,7 +681,7 @@ func (w *workflowServiceImpl) TerminateRecord(ctx context.Context, appModel *mod if err := operation.TerminateWorkflow(ctx, w.KubeClient, oamApp); err != nil { return err } - if err := w.syncWorkflowStatus(ctx, appModel.PrimaryKey(), oamApp, recordName, oamApp.Name, nil); err != nil { + if err := w.SyncWorkflowRecord(ctx, appModel.PrimaryKey(), recordName, oamApp, nil); err != nil { return err } @@ -846,7 +744,7 @@ func (w *workflowServiceImpl) RollbackRecord(ctx context.Context, appModel *mode // update the original revision status to rollback originalRevision.Status = model.RevisionStatusRollback originalRevision.RollbackVersion = revisionVersion - originalRevision.UpdateTime = time.Now().Time + originalRevision.UpdateTime = time.Now() if err := w.Store.Put(ctx, originalRevision); err != nil { return nil, err } @@ -1132,3 +1030,10 @@ func convertWorkflowStep(stepStatus model.StepStatus) workflowv1alpha1.StepStatu Reason: stepStatus.Reason, } } + +// NewTestWorkflowService create the workflow service instance for testing +func NewTestWorkflowService(ds datastore.DataStore, c client.Client) WorkflowService { + return &workflowServiceImpl{ + Store: ds, KubeClient: c, + } +} diff --git a/pkg/server/domain/service/workflow_test.go b/pkg/server/domain/service/workflow_test.go index 3a5d7b4f7..f8d1bde0b 100644 --- a/pkg/server/domain/service/workflow_test.go +++ b/pkg/server/domain/service/workflow_test.go @@ -250,7 +250,7 @@ var _ = Describe("Test workflow service functions", func() { Name: appName, }, "test-workflow-2") Expect(err).Should(BeNil()) - _, err = workflowService.CreateWorkflowRecord(context.TODO(), &model.Application{ + re, err := workflowService.CreateWorkflowRecord(context.TODO(), &model.Application{ Name: appName, }, app, workflow) Expect(err).Should(BeNil()) @@ -274,7 +274,7 @@ var _ = Describe("Test workflow service functions", func() { app.Status.ObservedGeneration = 1 err = workflowService.KubeClient.Status().Patch(ctx, app, client.Merge) Expect(err).Should(BeNil()) - err = workflowService.SyncWorkflowRecord(ctx) + err = workflowService.SyncWorkflowRecord(ctx, appName, re.Name, app, nil) Expect(err).Should(BeNil()) workflow, err = workflowService.GetWorkflow(context.TODO(), &model.Application{ @@ -322,7 +322,6 @@ var _ = Describe("Test workflow service functions", func() { Expect(err).Should(BeNil()) By("create one application revision to test sync workflow record") - appWithRevision := &v1beta1.Application{} err = json.Unmarshal(raw, appWithRevision) Expect(err).Should(BeNil()) @@ -344,7 +343,7 @@ var _ = Describe("Test workflow service functions", func() { appRevision.Status.Workflow.AppRevision = app.Annotations[oam.AnnotationPublishVersion] err = workflowService.KubeClient.Status().Update(ctx, appRevision) Expect(err).Should(BeNil()) - err = workflowService.SyncWorkflowRecord(ctx) + err = workflowService.SyncWorkflowRecord(ctx, appName, "test-workflow-2-111", app, nil) Expect(err).Should(BeNil()) By("check the record") diff --git a/pkg/server/event/event.go b/pkg/server/event/event.go index 7a95e765b..ba80cdf43 100644 --- a/pkg/server/event/event.go +++ b/pkg/server/event/event.go @@ -35,15 +35,12 @@ type Worker interface { // InitEvent init all event worker func InitEvent(cfg config.Config) []interface{} { - workflow := &sync.WorkflowRecordSync{ - Duration: cfg.LeaderConfig.Duration, - } application := &sync.ApplicationSync{ Queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } collect := &collect.InfoCalculateCronJob{} - workers = append(workers, workflow, application, collect) - return []interface{}{workflow, application, collect} + workers = append(workers, application, collect) + return []interface{}{application, collect} } // StartEventWorker start all event worker diff --git a/pkg/server/event/event_test.go b/pkg/server/event/event_test.go index 3f59b37eb..8f1e6a0d2 100644 --- a/pkg/server/event/event_test.go +++ b/pkg/server/event/event_test.go @@ -26,5 +26,5 @@ import ( func TestInitEvent(t *testing.T) { InitEvent(config.Config{}) - assert.Equal(t, len(workers), 3) + assert.Equal(t, len(workers), 2) } diff --git a/pkg/server/event/sync/cache.go b/pkg/server/event/sync/cache.go index 9cee8b656..6986b7f02 100644 --- a/pkg/server/event/sync/cache.go +++ b/pkg/server/event/sync/cache.go @@ -66,14 +66,20 @@ func (c *CR2UX) initCache(ctx context.Context) error { return nil } -func (c *CR2UX) shouldSync(ctx context.Context, targetApp *v1beta1.Application, del bool) bool { +func (c *CR2UX) appFromUX(targetApp *v1beta1.Application) bool { + if targetApp != nil && targetApp.Labels != nil && targetApp.Labels[types.LabelSourceOfTruth] == types.FromUX { + return true + } + return false +} + +func (c *CR2UX) appFromCLI(targetApp *v1beta1.Application) bool { if targetApp != nil && targetApp.Labels != nil { // the source is inner and is not the addon application, ignore it. if targetApp.Labels[types.LabelSourceOfTruth] == types.FromInner && targetApp.Labels[oam.LabelAddonName] == "" { return false } - // the source is UX, ignore it - if targetApp.Labels[types.LabelSourceOfTruth] == types.FromUX { + if c.appFromUX(targetApp) { return false } } @@ -84,7 +90,10 @@ func (c *CR2UX) shouldSync(ctx context.Context, targetApp *v1beta1.Application, return false } } + return true +} +func (c *CR2UX) shouldSyncMetaFromCLI(ctx context.Context, targetApp *v1beta1.Application, del bool) bool { key := formatAppComposedName(targetApp.Name, targetApp.Namespace) cachedData, ok := c.cache.Load(key) if ok { diff --git a/pkg/server/event/sync/cache_test.go b/pkg/server/event/sync/cache_test.go index e12a96e8d..af09368c5 100644 --- a/pkg/server/event/sync/cache_test.go +++ b/pkg/server/event/sync/cache_test.go @@ -18,7 +18,6 @@ package sync import ( "context" - "sync" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -51,7 +50,7 @@ var _ = Describe("Test Cache", func() { err = k8sClient.Create(context.TODO(), &ns) Expect(err).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{})) - cr2ux := CR2UX{ds: ds, cli: k8sClient, cache: sync.Map{}} + cr2ux := newCR2UX(ds) ctx := context.Background() Expect(ds.Add(ctx, &model.Application{Name: "app1"})).Should(BeNil()) @@ -70,7 +69,7 @@ var _ = Describe("Test Cache", func() { app1 := &v1beta1.Application{} app1.Name = "app1" app1.Namespace = "app1-ns" - Expect(cr2ux.shouldSync(ctx, app1, false)).Should(BeEquivalentTo(true)) + Expect(cr2ux.shouldSyncMetaFromCLI(ctx, app1, false)).Should(BeEquivalentTo(true)) app2 := &v1beta1.Application{} app2.Name = "app2" @@ -78,21 +77,21 @@ var _ = Describe("Test Cache", func() { app2.Generation = 1 app2.Status.LatestRevision = &common.Revision{Name: "v1"} - Expect(cr2ux.shouldSync(ctx, app2, false)).Should(BeEquivalentTo(true)) + Expect(cr2ux.shouldSyncMetaFromCLI(ctx, app2, false)).Should(BeEquivalentTo(true)) // Only need to sync once. cr2ux.syncCache(formatAppComposedName(app2.Name, app2.Namespace), "v1", 1) - Expect(cr2ux.shouldSync(ctx, app2, false)).Should(BeEquivalentTo(false)) + Expect(cr2ux.shouldSyncMetaFromCLI(ctx, app2, false)).Should(BeEquivalentTo(false)) app3 := &v1beta1.Application{} app3.Name = "app3" app3.Namespace = "app3-ns" app3.ResourceVersion = "3" app3.Labels = map[string]string{ - velatypes.LabelSourceOfTruth: velatypes.FromUX, + velatypes.LabelSourceOfTruth: velatypes.FromInner, } - Expect(cr2ux.shouldSync(ctx, app3, false)).Should(BeEquivalentTo(false)) + Expect(cr2ux.appFromCLI(app3)).Should(BeEquivalentTo(false)) Expect(ds.Put(ctx, &model.Application{Name: "app1", Labels: map[string]string{ model.LabelSyncRevision: "v1", @@ -100,10 +99,10 @@ var _ = Describe("Test Cache", func() { }})).Should(BeNil()) cr2ux.syncCache(formatAppComposedName(app1.Name, app1.Namespace), "v1", 0) app1.Status.LatestRevision = &common.Revision{Name: "v1"} - Expect(cr2ux.shouldSync(ctx, app1, false)).Should(BeEquivalentTo(false)) - Expect(cr2ux.shouldSync(ctx, app1, true)).Should(BeEquivalentTo(true)) + Expect(cr2ux.shouldSyncMetaFromCLI(ctx, app1, false)).Should(BeEquivalentTo(false)) + Expect(cr2ux.shouldSyncMetaFromCLI(ctx, app1, true)).Should(BeEquivalentTo(true)) Expect(ds.Delete(ctx, &model.Application{Name: "app1"})).Should(BeNil()) - Expect(cr2ux.shouldSync(ctx, app1, false)).Should(BeEquivalentTo(true)) + Expect(cr2ux.shouldSyncMetaFromCLI(ctx, app1, false)).Should(BeEquivalentTo(true)) }) It("Test don't cache with from inner system label", func() { @@ -117,7 +116,7 @@ var _ = Describe("Test Cache", func() { err = k8sClient.Create(context.TODO(), &ns) Expect(err).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{})) - cr2ux := CR2UX{ds: ds, cli: k8sClient, cache: sync.Map{}} + cr2ux := newCR2UX(ds) ctx := context.Background() app1 := &v1beta1.Application{} @@ -128,6 +127,6 @@ var _ = Describe("Test Cache", func() { app1.Labels = make(map[string]string) app1.Labels[velatypes.LabelSourceOfTruth] = velatypes.FromInner Expect(k8sClient.Create(ctx, app1)).Should(BeNil()) - Expect(cr2ux.shouldSync(ctx, app1, false)).Should(BeEquivalentTo(false)) + Expect(cr2ux.appFromCLI(app1)).Should(BeEquivalentTo(false)) }) }) diff --git a/pkg/server/event/sync/cr2ux.go b/pkg/server/event/sync/cr2ux.go index 79964faee..2bf0079e1 100644 --- a/pkg/server/event/sync/cr2ux.go +++ b/pkg/server/event/sync/cr2ux.go @@ -18,12 +18,15 @@ package sync import ( "context" + "fmt" + "strings" "sync" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/oam" "github.com/kubevela/velaux/pkg/server/domain/model" "github.com/kubevela/velaux/pkg/server/domain/service" @@ -59,6 +62,7 @@ type CR2UX struct { cache sync.Map projectService service.ProjectService applicationService service.ApplicationService + workflowService service.WorkflowService targetService service.TargetService envService service.EnvService } @@ -73,77 +77,112 @@ func (c *CR2UX) getAppMetaName(ctx context.Context, name, namespace string) stri return appName } -// AddOrUpdate will sync application CR to storage of VelaUX automatically -func (c *CR2UX) AddOrUpdate(ctx context.Context, targetApp *v1beta1.Application) error { - ds := c.ds - if !c.shouldSync(ctx, targetApp, false) { - return nil +func (c *CR2UX) syncAppCreatedByUX(ctx context.Context, targetApp *v1beta1.Application) error { + appPrimaryKey := targetApp.Annotations[oam.AnnotationAppName] + if appPrimaryKey == "" { + return fmt.Errorf("appName is empty in application %s", targetApp.Name) } - - dsApp, err := c.ConvertApp2DatastoreApp(ctx, targetApp) - if err != nil { - klog.Errorf("Convert App to data store err %v", err) + if targetApp.Annotations == nil || targetApp.Annotations[oam.AnnotationPublishVersion] == "" { + klog.Warningf("app %s/%s has no publish version, skip sync workflow status", targetApp.Namespace, targetApp.Name) + } + recordName := targetApp.Annotations[oam.AnnotationPublishVersion] + if err := c.workflowService.SyncWorkflowRecord(ctx, appPrimaryKey, recordName, targetApp, nil); err != nil { + klog.ErrorS(err, "failed to sync workflow status", "oam app name", targetApp.Name, "workflow name", oam.GetPublishVersion(targetApp), "record name", recordName) return err } - if dsApp.Project != nil { - if err = StoreProject(ctx, *dsApp.Project, ds, c.projectService); err != nil { - klog.Errorf("get or create project for sync process err %v", err) + return nil +} + +func (c *CR2UX) syncAppCreatedByCLI(ctx context.Context, targetApp *v1beta1.Application) error { + if c.shouldSyncMetaFromCLI(ctx, targetApp, false) { + ds := c.ds + dsApp, err := c.ConvertApp2DatastoreApp(ctx, targetApp) + if err != nil { + klog.Errorf("Convert App to data store err %v", err) return err } - } + if dsApp.Project != nil { + if err = StoreProject(ctx, *dsApp.Project, ds, c.projectService); err != nil { + klog.Errorf("get or create project for sync process err %v", err) + return err + } + } - if err = StoreTargets(ctx, dsApp, ds, c.targetService); err != nil { - klog.Errorf("Store targets to data store err %v", err) - return err - } + if err = StoreTargets(ctx, dsApp, ds, c.targetService); err != nil { + klog.Errorf("Store targets to data store err %v", err) + return err + } - if err = StoreEnv(ctx, dsApp, ds, c.envService); err != nil { - klog.Errorf("Store Env Metadata to data store err %v", err) - return err - } - if err = StoreEnvBinding(ctx, dsApp.Eb, ds); err != nil { - klog.Errorf("Store EnvBinding Metadata to data store err %v", err) - return err - } - if err = StoreComponents(ctx, dsApp.AppMeta.Name, dsApp.Comps, ds); err != nil { - klog.Errorf("Store Components Metadata to data store err %v", err) - return err - } - if err = StorePolicy(ctx, dsApp.AppMeta.Name, dsApp.Policies, ds); err != nil { - klog.Errorf("Store Policy Metadata to data store err %v", err) - return err - } - if err = StoreWorkflow(ctx, dsApp, ds); err != nil { - klog.Errorf("Store Workflow Metadata to data store err %v", err) - return err - } + if err = StoreEnv(ctx, dsApp, ds, c.envService); err != nil { + klog.Errorf("Store Env Metadata to data store err %v", err) + return err + } + if err = StoreEnvBinding(ctx, dsApp.Eb, ds); err != nil { + klog.Errorf("Store EnvBinding Metadata to data store err %v", err) + return err + } + if err = StoreComponents(ctx, dsApp.AppMeta.Name, dsApp.Comps, ds); err != nil { + klog.Errorf("Store Components Metadata to data store err %v", err) + return err + } + if err = StorePolicy(ctx, dsApp.AppMeta.Name, dsApp.Policies, ds); err != nil { + klog.Errorf("Store Policy Metadata to data store err %v", err) + return err + } + if err = StoreWorkflow(ctx, dsApp, ds); err != nil { + klog.Errorf("Store Workflow Metadata to data store err %v", err) + return err + } - if err = StoreApplicationRevision(ctx, dsApp, ds); err != nil { - klog.Errorf("Store application revision to data store err %v", err) - return err - } + if err = StoreApplicationRevision(ctx, dsApp, ds); err != nil { + klog.Errorf("Store application revision to data store err %v", err) + return err + } - if err = StoreWorkflowRecord(ctx, dsApp, ds); err != nil { - klog.Errorf("Store Workflow Record to data store err %v", err) - return err + if err = StoreWorkflowRecord(ctx, dsApp, ds); err != nil { + klog.Errorf("Store Workflow Record to data store err %v", err) + return err + } + + if err = StoreAppMeta(ctx, dsApp, ds); err != nil { + klog.Errorf("Store App Metadata to data store err %v", err) + return err + } + + // update cache + key := formatAppComposedName(targetApp.Name, targetApp.Namespace) + syncedVersion := getSyncedRevision(dsApp.Revision) + c.syncCache(key, syncedVersion, int64(len(dsApp.Targets))) + klog.Infof("application %s/%s revision %s synced successful", targetApp.Name, targetApp.Namespace, syncedVersion) } - if err = StoreAppMeta(ctx, dsApp, ds); err != nil { - klog.Errorf("Store App Metadata to data store err %v", err) - return err + recordName := oam.GetPublishVersion(targetApp) + if recordName == "" { + if targetApp.Status.Workflow != nil { + recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1) + } else { + klog.Warningf("app %s/%s has no publish version or revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name) + } } + return c.workflowService.SyncWorkflowRecord(ctx, c.getAppMetaName(ctx, targetApp.Name, targetApp.Namespace), recordName, targetApp, nil) +} - // update cache - key := formatAppComposedName(targetApp.Name, targetApp.Namespace) - syncedVersion := getSyncedRevision(dsApp.Revision) - c.syncCache(key, syncedVersion, int64(len(dsApp.Targets))) - klog.Infof("application %s/%s revision %s synced successful", targetApp.Name, targetApp.Namespace, syncedVersion) +// AddOrUpdate will sync application CR to storage of VelaUX automatically +func (c *CR2UX) AddOrUpdate(ctx context.Context, targetApp *v1beta1.Application) error { + switch { + case c.appFromUX(targetApp): + return c.syncAppCreatedByUX(ctx, targetApp) + case c.appFromCLI(targetApp): + return c.syncAppCreatedByCLI(ctx, targetApp) + default: + klog.Infof("skip syncing application %s/%s", targetApp.Name, targetApp.Namespace) + } return nil } // DeleteApp will delete the application as the CR was deleted func (c *CR2UX) DeleteApp(ctx context.Context, targetApp *v1beta1.Application) error { - if !c.shouldSync(ctx, targetApp, true) { + if !c.appFromCLI(targetApp) && !c.shouldSyncMetaFromCLI(ctx, targetApp, true) { return nil } app, appName, err := c.getApp(ctx, targetApp.Name, targetApp.Namespace) diff --git a/pkg/server/event/sync/cr2ux_test.go b/pkg/server/event/sync/cr2ux_test.go index 1f7773847..3f676c8e4 100644 --- a/pkg/server/event/sync/cr2ux_test.go +++ b/pkg/server/event/sync/cr2ux_test.go @@ -268,6 +268,7 @@ func newCR2UX(ds datastore.DataStore) *CR2UX { targetService := service.NewTestTargetService(ds, k8sClient) envService := service.NewTestEnvService(ds, k8sClient) userService := service.NewTestUserService(ds, k8sClient) + workflowService := service.NewTestWorkflowService(ds, k8sClient) err := userService.Init(context.TODO()) Expect(err).Should(BeNil()) return &CR2UX{ @@ -278,5 +279,6 @@ func newCR2UX(ds datastore.DataStore) *CR2UX { targetService: targetService, envService: envService, applicationService: applicationService, + workflowService: workflowService, } } diff --git a/pkg/server/event/sync/worker.go b/pkg/server/event/sync/worker.go index 1cffc2a83..b86d21330 100644 --- a/pkg/server/event/sync/worker.go +++ b/pkg/server/event/sync/worker.go @@ -45,6 +45,7 @@ type ApplicationSync struct { Store datastore.DataStore `inject:"datastore"` ProjectService service.ProjectService `inject:""` ApplicationService service.ApplicationService `inject:""` + WorkflowService service.WorkflowService `inject:""` TargetService service.TargetService `inject:""` EnvService service.EnvService `inject:""` Queue workqueue.RateLimitingInterface @@ -78,6 +79,7 @@ func (a *ApplicationSync) Start(ctx context.Context, errorChan chan error) { cache: sync.Map{}, projectService: a.ProjectService, applicationService: a.ApplicationService, + workflowService: a.WorkflowService, targetService: a.TargetService, envService: a.EnvService, } @@ -87,12 +89,13 @@ func (a *ApplicationSync) Start(ctx context.Context, errorChan chan error) { go func() { for { - app, down := a.Queue.Get() + item, down := a.Queue.Get() if down { break } - if err := cu.AddOrUpdate(ctx, app.(*v1beta1.Application)); err != nil { - klog.Errorf("fail to add or update application %s", err.Error()) + app := item.(*v1beta1.Application) + if err := cu.AddOrUpdate(ctx, app); err != nil { + klog.Errorf("fail to add or update application %s: %s", app.Name, err.Error()) } a.Queue.Done(app) } diff --git a/pkg/server/event/sync/worker_test.go b/pkg/server/event/sync/worker_test.go index 04327c69e..4a7aedaef 100644 --- a/pkg/server/event/sync/worker_test.go +++ b/pkg/server/event/sync/worker_test.go @@ -74,6 +74,7 @@ var _ = Describe("Test Worker CR sync to datastore", func() { ProjectService: crux.projectService, ApplicationService: crux.applicationService, TargetService: crux.targetService, + WorkflowService: crux.workflowService, EnvService: crux.envService, } go appSync.Start(ctx, make(chan error)) diff --git a/pkg/server/event/sync/workflow_record.go b/pkg/server/event/sync/workflow_record.go deleted file mode 100644 index e8e0dd544..000000000 --- a/pkg/server/event/sync/workflow_record.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright 2022 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sync - -import ( - "context" - "time" - - "k8s.io/klog/v2" - - "github.com/kubevela/velaux/pkg/server/domain/service" -) - -// WorkflowRecordSync sync workflow record from cluster to database -type WorkflowRecordSync struct { - Duration time.Duration - WorkflowService service.WorkflowService `inject:""` -} - -// Start sync workflow record data -func (w *WorkflowRecordSync) Start(ctx context.Context, errorChan chan error) { - klog.Infof("workflow record syncing worker started") - defer klog.Infof("workflow record syncing worker closed") - t := time.NewTicker(w.Duration) - defer t.Stop() - for { - select { - case <-t.C: - if err := w.WorkflowService.SyncWorkflowRecord(ctx); err != nil { - klog.Errorf("syncWorkflowRecordError: %s", err.Error()) - } - case <-ctx.Done(): - return - } - } -}