diff --git a/.gitignore b/.gitignore index a847e74fa..04ac321d7 100644 --- a/.gitignore +++ b/.gitignore @@ -38,4 +38,4 @@ dist .pnp.* -tsconfig.tsbuildinfo \ No newline at end of file +tsconfig.tsbuildinfo diff --git a/pkg/server/domain/service/workflow.go b/pkg/server/domain/service/workflow.go index 162e8afb9..bf8cc6aa9 100644 --- a/pkg/server/domain/service/workflow.go +++ b/pkg/server/domain/service/workflow.go @@ -24,10 +24,9 @@ import ( "io" "strconv" "strings" + "time" - "helm.sh/helm/v3/pkg/time" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -77,7 +76,7 @@ type WorkflowService interface { CreateWorkflowRecord(ctx context.Context, appModel *model.Application, app *v1beta1.Application, workflow *model.Workflow) (*model.WorkflowRecord, error) ListWorkflowRecords(ctx context.Context, workflow *model.Workflow, page, pageSize int) (*apisv1.ListWorkflowRecordsResponse, error) DetailWorkflowRecord(ctx context.Context, workflow *model.Workflow, recordName string) (*apisv1.DetailWorkflowRecordResponse, error) - SyncWorkflowRecord(ctx context.Context) error + SyncWorkflowRecord(ctx context.Context, appKey, recordName string, app *v1beta1.Application, workflowContext map[string]string) error ResumeRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, stepName string) error TerminateRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName string) error RollbackRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, revisionName string) (*apisv1.WorkflowRecordBase, error) @@ -353,135 +352,10 @@ func (w *workflowServiceImpl) DetailWorkflowRecord(ctx context.Context, workflow }, nil } -func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context) error { - var record = model.WorkflowRecord{ - Finished: "false", +func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context, appPrimaryKey, recordName string, app *v1beta1.Application, workflowContext map[string]string) error { + if app == nil || app.Annotations == nil || app.Status.Workflow == nil { + return nil } - // list all unfinished workflow records - records, err := w.Store.List(ctx, &record, &datastore.ListOptions{}) - if err != nil { - return err - } - - for _, item := range records { - app := &v1beta1.Application{} - record := item.(*model.WorkflowRecord) - workflow := &model.Workflow{ - Name: record.WorkflowName, - AppPrimaryKey: record.AppPrimaryKey, - } - if err := w.Store.Get(ctx, workflow); err != nil { - klog.ErrorS(err, "failed to get workflow", "app name", record.AppPrimaryKey, "workflow name", record.WorkflowName, "record name", record.Name) - continue - } - envbinding, err := w.EnvBindingService.GetEnvBinding(ctx, &model.Application{Name: record.AppPrimaryKey}, workflow.EnvName) - if err != nil { - klog.ErrorS(err, "failed to get envbinding", "app name", record.AppPrimaryKey, "workflow name", record.WorkflowName, "record name", record.Name) - } - var appName string - if envbinding != nil { - appName = envbinding.AppDeployName - } - if appName == "" { - appName = record.AppPrimaryKey - } - if err := w.KubeClient.Get(ctx, types.NamespacedName{ - Name: appName, - Namespace: record.Namespace, - }, app); err != nil { - if apierrors.IsNotFound(err) { - klog.Warningf("can't find the application %s/%s, set the record status to terminated", appName, record.Namespace) - if err := w.setRecordToTerminated(ctx, record.AppPrimaryKey, record.Name); err != nil { - klog.Errorf("failed to set the record status to terminated %s", err.Error()) - } - continue - } - klog.ErrorS(err, "failed to get app", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) - continue - } - - if app.Status.Workflow == nil { - continue - } - - // This means the application workflow has not run. - if app.Generation > app.Status.ObservedGeneration { - continue - } - - // there is a ":" in the default app revision - recordName := strings.Replace(app.Status.Workflow.AppRevision, ":", "-", 1) - - // try to sync the status from the running application - if app.Annotations != nil && app.Status.Workflow != nil && recordName == record.Name { - if err := w.syncWorkflowStatus(ctx, record.AppPrimaryKey, app, record.Name, app.Name, nil); err != nil { - klog.ErrorS(err, "failed to sync workflow status", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) - } - } - - if record.Name == oam.GetPublishVersion(app) { - continue - } - - // try to sync the status from the application revision - var revision = &model.ApplicationRevision{AppPrimaryKey: record.AppPrimaryKey, Version: record.RevisionPrimaryKey} - if err := w.Store.Get(ctx, revision); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - // If the application revision is not exist, the record do not need be synced - var record = &model.WorkflowRecord{ - AppPrimaryKey: record.AppPrimaryKey, - Name: recordName, - } - if err := w.Store.Get(ctx, record); err == nil { - record.Finished = "true" - record.Status = model.RevisionStatusFail - err := w.Store.Put(ctx, record) - if err != nil { - klog.Errorf("failed to set the workflow status is failure %s", err.Error()) - } - continue - } - } - klog.Errorf("failed to get the application revision from database %s", err.Error()) - continue - } - - var appRevision v1beta1.ApplicationRevision - if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: app.Namespace, Name: revision.RevisionCRName}, &appRevision); err != nil { - if apierrors.IsNotFound(err) { - klog.Warningf("can't find the application revision %s/%s, set the record status to terminated", revision.RevisionCRName, app.Namespace) - if err := w.setRecordToTerminated(ctx, record.AppPrimaryKey, record.Name); err != nil { - klog.Errorf("failed to set the record status to terminated %s", err.Error()) - } - continue - } - klog.Warningf("failed to get the application revision %s", err.Error()) - continue - } - - if appRevision.Status.Workflow != nil { - appRevision.Spec.Application.Status.Workflow = appRevision.Status.Workflow - if !appRevision.Spec.Application.Status.Workflow.Finished { - appRevision.Spec.Application.Status.Workflow.Finished = true - appRevision.Spec.Application.Status.Workflow.Terminated = true - } - } - if err := w.syncWorkflowStatus(ctx, - record.AppPrimaryKey, - &appRevision.Spec.Application, - record.Name, - appRevision.Name, - appRevision.Status.WorkflowContext, - ); err != nil { - klog.ErrorS(err, "failed to sync workflow status", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) - continue - } - } - - return nil -} - -func (w *workflowServiceImpl) setRecordToTerminated(ctx context.Context, appPrimaryKey, recordName string) error { var record = &model.WorkflowRecord{ AppPrimaryKey: appPrimaryKey, Name: recordName, @@ -499,114 +373,76 @@ func (w *workflowServiceImpl) setRecordToTerminated(ctx context.Context, appPrim } return err } - record.Status = model.RevisionStatusTerminated - record.Finished = "true" - revision.Status = model.RevisionStatusTerminated - - if err := w.Store.Put(ctx, record); err != nil { - return err + if workflowContext != nil { + record.ContextValue = workflowContext } - if err := w.Store.Put(ctx, revision); err != nil { - return err - } - return nil -} + status := app.Status.Workflow + record.Status = string(status.Phase) + record.Message = status.Message + record.Mode = status.Mode -func (w *workflowServiceImpl) syncWorkflowStatus(ctx context.Context, - appPrimaryKey string, - app *v1beta1.Application, - recordName, - source string, - workflowContext map[string]string) error { - var record = &model.WorkflowRecord{ - AppPrimaryKey: appPrimaryKey, - Name: recordName, - } - if err := w.Store.Get(ctx, record); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrWorkflowRecordNotExist - } - return err - } - var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey} - if err := w.Store.Get(ctx, revision); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrApplicationRevisionNotExist + if cb := app.Status.Workflow.ContextBackend; cb != nil && workflowContext == nil && cb.Namespace != "" && cb.Name != "" { + var cm corev1.ConfigMap + if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: cb.Namespace, Name: cb.Name}, &cm); err != nil { + klog.Errorf("failed to load the context values of the application %s:%s", app.Name, err.Error()) } - return err + record.ContextValue = cm.Data } - if workflowContext != nil { - record.ContextValue = workflowContext - } - - if app.Status.Workflow != nil { - if app.Status.Workflow.AppRevision != record.Name { - klog.Warningf("the app(%s) revision is not match the record(%s), try next time..", app.Name, record.Name) - return nil - } - status := app.Status.Workflow - record.Status = string(status.Phase) - record.Message = status.Message - record.Mode = status.Mode - - if cb := app.Status.Workflow.ContextBackend; cb != nil && workflowContext == nil && cb.Namespace != "" && cb.Name != "" { - var cm corev1.ConfigMap - if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: cb.Namespace, Name: cb.Name}, &cm); err != nil { - klog.Errorf("failed to load the context values of the application %s:%s", app.Name, err.Error()) - } - record.ContextValue = cm.Data + stepStatus := make(map[string]*model.WorkflowStepStatus, len(status.Steps)) + stepAlias := make(map[string]string) + for _, step := range record.Steps { + stepAlias[step.Name] = step.Alias + for _, sub := range step.SubStepsStatus { + stepAlias[sub.Name] = sub.Alias } - - stepStatus := make(map[string]*model.WorkflowStepStatus, len(status.Steps)) - stepAlias := make(map[string]string) - for _, step := range record.Steps { - stepAlias[step.Name] = step.Alias - for _, sub := range step.SubStepsStatus { - stepAlias[sub.Name] = sub.Alias - } + } + for _, step := range status.Steps { + stepStatus[step.Name] = &model.WorkflowStepStatus{ + StepStatus: convert.FromCRWorkflowStepStatus(step.StepStatus, stepAlias[step.Name]), + SubStepsStatus: make([]model.StepStatus, 0), } - for _, step := range status.Steps { - stepStatus[step.Name] = &model.WorkflowStepStatus{ - StepStatus: convert.FromCRWorkflowStepStatus(step.StepStatus, stepAlias[step.Name]), - SubStepsStatus: make([]model.StepStatus, 0), - } - for _, sub := range step.SubStepsStatus { - stepStatus[step.Name].SubStepsStatus = append(stepStatus[step.Name].SubStepsStatus, convert.FromCRWorkflowStepStatus(sub, stepAlias[sub.Name])) - } + for _, sub := range step.SubStepsStatus { + stepStatus[step.Name].SubStepsStatus = append(stepStatus[step.Name].SubStepsStatus, convert.FromCRWorkflowStepStatus(sub, stepAlias[sub.Name])) } - for i, step := range record.Steps { - if stepStatus[step.Name] != nil { - record.Steps[i] = *stepStatus[step.Name] - } + } + for i, step := range record.Steps { + if stepStatus[step.Name] != nil { + record.Steps[i] = *stepStatus[step.Name] + } else { + record.Steps[i].Phase = "" + record.Steps[i].Message = "" + record.Steps[i].Reason = "" + record.Steps[i].FirstExecuteTime = time.Time{} + record.Steps[i].LastExecuteTime = time.Time{} } + } - // the auto generated workflow steps should be sync - if (len(record.Steps) == 0) && len(status.Steps) > 0 { - for k := range stepStatus { - record.Steps = append(record.Steps, *stepStatus[k]) - } + // the auto generated workflow steps should be sync + if (len(record.Steps) == 0) && len(status.Steps) > 0 { + for k := range stepStatus { + record.Steps = append(record.Steps, *stepStatus[k]) } + } - record.Finished = strconv.FormatBool(status.Finished) - record.EndTime = status.EndTime.Time - if err := w.Store.Put(ctx, record); err != nil { - return err - } + record.Finished = strconv.FormatBool(status.Finished) + record.EndTime = status.EndTime.Time + if err := w.Store.Put(ctx, record); err != nil { + return err + } - revision.Status = generateRevisionStatus(status.Phase) - if app.Status.LatestRevision != nil { - revision.RevisionCRName = app.Status.LatestRevision.Name - } - if err := w.Store.Put(ctx, revision); err != nil { - return err - } + revision.Status = generateRevisionStatus(status.Phase) + if app.Status.LatestRevision != nil { + revision.RevisionCRName = app.Status.LatestRevision.Name + } + if err := w.Store.Put(ctx, revision); err != nil { + return err } if record.Finished == "true" { - klog.InfoS("successfully sync workflow status", "oam app name", app.Name, "workflow name", record.WorkflowName, "record name", record.Name, "status", record.Status, "sync source", source) + klog.InfoS("successfully sync workflow status", "oam app name", app.Name, "workflow name", record.WorkflowName, "record name", record.Name, "status", record.Status, "sync source", app.Name) } return nil @@ -662,7 +498,7 @@ func (w *workflowServiceImpl) CreateWorkflowRecord(ctx context.Context, appModel Name: app.Annotations[oam.AnnotationPublishVersion], Namespace: app.Namespace, Finished: "false", - StartTime: time.Now().Time, + StartTime: time.Now(), Steps: steps, Status: string(workflowv1alpha1.WorkflowStateInitializing), } @@ -699,6 +535,7 @@ func resetRevisionsAndRecords(ctx context.Context, ds datastore.DataStore, appNa if err := ds.Put(ctx, revision); err != nil { klog.Info("failed to set rest revisions' status to terminate", "app name", appName, "revision version", revision.Version, "error", err) } + klog.Info("set rest revisions' status to terminate", "app name", appName, "revision version", revision.Version) } } @@ -729,6 +566,7 @@ func resetRevisionsAndRecords(ctx context.Context, ds datastore.DataStore, appNa if err := ds.Put(ctx, record); err != nil { klog.Info("failed to set rest records' status to terminate", "app name", appName, "workflow name", record.WorkflowName, "record name", record.Name, "error", err) } + klog.Info("set rest records' status to terminate", "app name", appName, "workflow name", record.WorkflowName, "record name", record.Name) } } @@ -768,7 +606,7 @@ func (w *workflowServiceImpl) ResumeRecord(ctx context.Context, appModel *model. return err } - if err := w.syncWorkflowStatus(ctx, appModel.PrimaryKey(), oamApp, recordName, oamApp.Name, nil); err != nil { + if err := w.SyncWorkflowRecord(ctx, appModel.PrimaryKey(), recordName, oamApp, nil); err != nil { return err } @@ -783,7 +621,7 @@ func (w *workflowServiceImpl) TerminateRecord(ctx context.Context, appModel *mod if err := operation.TerminateWorkflow(ctx, w.KubeClient, oamApp); err != nil { return err } - if err := w.syncWorkflowStatus(ctx, appModel.PrimaryKey(), oamApp, recordName, oamApp.Name, nil); err != nil { + if err := w.SyncWorkflowRecord(ctx, appModel.PrimaryKey(), recordName, oamApp, nil); err != nil { return err } @@ -846,7 +684,7 @@ func (w *workflowServiceImpl) RollbackRecord(ctx context.Context, appModel *mode // update the original revision status to rollback originalRevision.Status = model.RevisionStatusRollback originalRevision.RollbackVersion = revisionVersion - originalRevision.UpdateTime = time.Now().Time + originalRevision.UpdateTime = time.Now() if err := w.Store.Put(ctx, originalRevision); err != nil { return nil, err } diff --git a/pkg/server/domain/service/workflow_test.go b/pkg/server/domain/service/workflow_test.go index 3a5d7b4f7..d481c19a1 100644 --- a/pkg/server/domain/service/workflow_test.go +++ b/pkg/server/domain/service/workflow_test.go @@ -27,7 +27,6 @@ import ( workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" @@ -250,7 +249,7 @@ var _ = Describe("Test workflow service functions", func() { Name: appName, }, "test-workflow-2") Expect(err).Should(BeNil()) - _, err = workflowService.CreateWorkflowRecord(context.TODO(), &model.Application{ + re, err := workflowService.CreateWorkflowRecord(context.TODO(), &model.Application{ Name: appName, }, app, workflow) Expect(err).Should(BeNil()) @@ -274,7 +273,7 @@ var _ = Describe("Test workflow service functions", func() { app.Status.ObservedGeneration = 1 err = workflowService.KubeClient.Status().Patch(ctx, app, client.Merge) Expect(err).Should(BeNil()) - err = workflowService.SyncWorkflowRecord(ctx) + err = workflowService.SyncWorkflowRecord(ctx, appName, re.Name, app, nil) Expect(err).Should(BeNil()) workflow, err = workflowService.GetWorkflow(context.TODO(), &model.Application{ @@ -320,42 +319,6 @@ var _ = Describe("Test workflow service functions", func() { } err = workflowService.createTestApplicationRevision(context.TODO(), anotherRevision) Expect(err).Should(BeNil()) - - By("create one application revision to test sync workflow record") - - appWithRevision := &v1beta1.Application{} - err = json.Unmarshal(raw, appWithRevision) - Expect(err).Should(BeNil()) - var appRevision = &v1beta1.ApplicationRevision{ - ObjectMeta: metav1.ObjectMeta{ - Name: "1111-v1", - Namespace: "default", - Labels: map[string]string{"vela.io/wf-revision": "test-workflow-2-111"}, - }, - Spec: v1beta1.ApplicationRevisionSpec{ - ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ - Application: *appWithRevision, - }, - }, - } - err = workflowService.KubeClient.Create(ctx, appRevision) - Expect(err).Should(BeNil()) - appRevision.Status.Workflow = appWithRevision.Status.Workflow - appRevision.Status.Workflow.AppRevision = app.Annotations[oam.AnnotationPublishVersion] - err = workflowService.KubeClient.Status().Update(ctx, appRevision) - Expect(err).Should(BeNil()) - err = workflowService.SyncWorkflowRecord(ctx) - Expect(err).Should(BeNil()) - - By("check the record") - anotherRecord, err := workflowService.DetailWorkflowRecord(context.TODO(), workflow, "test-workflow-2-111") - Expect(err).Should(BeNil()) - Expect(anotherRecord.Status).Should(Equal(string(workflowv1alpha1.WorkflowStepPhaseFailed))) - - By("check the application revision") - err = workflowService.Store.Get(ctx, anotherRevision) - Expect(err).Should(BeNil()) - Expect(anotherRevision.Status).Should(Equal(model.RevisionStatusFail)) }) It("Test CreateRecord function", func() { diff --git a/pkg/server/event/event.go b/pkg/server/event/event.go index 7a95e765b..ba80cdf43 100644 --- a/pkg/server/event/event.go +++ b/pkg/server/event/event.go @@ -35,15 +35,12 @@ type Worker interface { // InitEvent init all event worker func InitEvent(cfg config.Config) []interface{} { - workflow := &sync.WorkflowRecordSync{ - Duration: cfg.LeaderConfig.Duration, - } application := &sync.ApplicationSync{ Queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } collect := &collect.InfoCalculateCronJob{} - workers = append(workers, workflow, application, collect) - return []interface{}{workflow, application, collect} + workers = append(workers, application, collect) + return []interface{}{application, collect} } // StartEventWorker start all event worker diff --git a/pkg/server/event/sync/cache.go b/pkg/server/event/sync/cache.go index 9cee8b656..7548d34c2 100644 --- a/pkg/server/event/sync/cache.go +++ b/pkg/server/event/sync/cache.go @@ -66,16 +66,19 @@ func (c *CR2UX) initCache(ctx context.Context) error { return nil } +func (c *CR2UX) appFromUX(targetApp *v1beta1.Application) bool { + if targetApp != nil && targetApp.Labels != nil && targetApp.Labels[types.LabelSourceOfTruth] == types.FromUX { + return true + } + return false +} + func (c *CR2UX) shouldSync(ctx context.Context, targetApp *v1beta1.Application, del bool) bool { if targetApp != nil && targetApp.Labels != nil { // the source is inner and is not the addon application, ignore it. if targetApp.Labels[types.LabelSourceOfTruth] == types.FromInner && targetApp.Labels[oam.LabelAddonName] == "" { return false } - // the source is UX, ignore it - if targetApp.Labels[types.LabelSourceOfTruth] == types.FromUX { - return false - } } // if no LabelSourceOfTruth label, it means the app is existing ones, check the existing labels and annotations diff --git a/pkg/server/event/sync/cache_test.go b/pkg/server/event/sync/cache_test.go index e12a96e8d..41cd7a280 100644 --- a/pkg/server/event/sync/cache_test.go +++ b/pkg/server/event/sync/cache_test.go @@ -89,7 +89,7 @@ var _ = Describe("Test Cache", func() { app3.Namespace = "app3-ns" app3.ResourceVersion = "3" app3.Labels = map[string]string{ - velatypes.LabelSourceOfTruth: velatypes.FromUX, + velatypes.LabelSourceOfTruth: velatypes.FromInner, } Expect(cr2ux.shouldSync(ctx, app3, false)).Should(BeEquivalentTo(false)) diff --git a/pkg/server/event/sync/cr2ux.go b/pkg/server/event/sync/cr2ux.go index 79964faee..97ffe3b86 100644 --- a/pkg/server/event/sync/cr2ux.go +++ b/pkg/server/event/sync/cr2ux.go @@ -18,12 +18,14 @@ package sync import ( "context" + "fmt" "sync" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/oam" "github.com/kubevela/velaux/pkg/server/domain/model" "github.com/kubevela/velaux/pkg/server/domain/service" @@ -59,6 +61,7 @@ type CR2UX struct { cache sync.Map projectService service.ProjectService applicationService service.ApplicationService + workflowService service.WorkflowService targetService service.TargetService envService service.EnvService } @@ -73,13 +76,24 @@ func (c *CR2UX) getAppMetaName(ctx context.Context, name, namespace string) stri return appName } -// AddOrUpdate will sync application CR to storage of VelaUX automatically -func (c *CR2UX) AddOrUpdate(ctx context.Context, targetApp *v1beta1.Application) error { - ds := c.ds - if !c.shouldSync(ctx, targetApp, false) { - return nil +func (c *CR2UX) syncAppCreatedByUX(ctx context.Context, targetApp *v1beta1.Application) error { + appPrimaryKey := targetApp.Annotations[oam.AnnotationAppName] + if appPrimaryKey == "" { + return fmt.Errorf("appName is empty in application %s", targetApp.Name) + } + var recordName string + if targetApp.Annotations != nil { + recordName = targetApp.Annotations[oam.AnnotationPublishVersion] } + if err := c.workflowService.SyncWorkflowRecord(ctx, appPrimaryKey, recordName, targetApp, nil); err != nil { + klog.ErrorS(err, "failed to sync workflow status", "oam app name", targetApp.Name, "workflow name", oam.GetPublishVersion(targetApp), "record name", recordName) + return err + } + return nil +} +func (c *CR2UX) syncAppCreatedByCLI(ctx context.Context, targetApp *v1beta1.Application) error { + ds := c.ds dsApp, err := c.ConvertApp2DatastoreApp(ctx, targetApp) if err != nil { klog.Errorf("Convert App to data store err %v", err) @@ -141,6 +155,17 @@ func (c *CR2UX) AddOrUpdate(ctx context.Context, targetApp *v1beta1.Application) return nil } +// AddOrUpdate will sync application CR to storage of VelaUX automatically +func (c *CR2UX) AddOrUpdate(ctx context.Context, targetApp *v1beta1.Application) error { + if c.appFromUX(targetApp) { + return c.syncAppCreatedByUX(ctx, targetApp) + } + if c.shouldSync(ctx, targetApp, false) { + return c.syncAppCreatedByCLI(ctx, targetApp) + } + return nil +} + // DeleteApp will delete the application as the CR was deleted func (c *CR2UX) DeleteApp(ctx context.Context, targetApp *v1beta1.Application) error { if !c.shouldSync(ctx, targetApp, true) { diff --git a/pkg/server/event/sync/worker.go b/pkg/server/event/sync/worker.go index 1cffc2a83..6c6dac140 100644 --- a/pkg/server/event/sync/worker.go +++ b/pkg/server/event/sync/worker.go @@ -45,6 +45,7 @@ type ApplicationSync struct { Store datastore.DataStore `inject:"datastore"` ProjectService service.ProjectService `inject:""` ApplicationService service.ApplicationService `inject:""` + WorkflowService service.WorkflowService `inject:""` TargetService service.TargetService `inject:""` EnvService service.EnvService `inject:""` Queue workqueue.RateLimitingInterface @@ -78,6 +79,7 @@ func (a *ApplicationSync) Start(ctx context.Context, errorChan chan error) { cache: sync.Map{}, projectService: a.ProjectService, applicationService: a.ApplicationService, + workflowService: a.WorkflowService, targetService: a.TargetService, envService: a.EnvService, } diff --git a/pkg/server/event/sync/workflow_record.go b/pkg/server/event/sync/workflow_record.go deleted file mode 100644 index e8e0dd544..000000000 --- a/pkg/server/event/sync/workflow_record.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright 2022 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sync - -import ( - "context" - "time" - - "k8s.io/klog/v2" - - "github.com/kubevela/velaux/pkg/server/domain/service" -) - -// WorkflowRecordSync sync workflow record from cluster to database -type WorkflowRecordSync struct { - Duration time.Duration - WorkflowService service.WorkflowService `inject:""` -} - -// Start sync workflow record data -func (w *WorkflowRecordSync) Start(ctx context.Context, errorChan chan error) { - klog.Infof("workflow record syncing worker started") - defer klog.Infof("workflow record syncing worker closed") - t := time.NewTicker(w.Duration) - defer t.Stop() - for { - select { - case <-t.C: - if err := w.WorkflowService.SyncWorkflowRecord(ctx); err != nil { - klog.Errorf("syncWorkflowRecordError: %s", err.Error()) - } - case <-ctx.Done(): - return - } - } -}