Skip to content

Commit

Permalink
Chore: refactor workflow status syncer with informer
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <[email protected]>
  • Loading branch information
FogDong committed Mar 29, 2023
1 parent 93ecac2 commit a59193b
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 330 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ dist

.pnp.*

tsconfig.tsbuildinfo
tsconfig.tsbuildinfo
288 changes: 63 additions & 225 deletions pkg/server/domain/service/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"io"
"strconv"
"strings"
"time"

"helm.sh/helm/v3/pkg/time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -77,7 +76,7 @@ type WorkflowService interface {
CreateWorkflowRecord(ctx context.Context, appModel *model.Application, app *v1beta1.Application, workflow *model.Workflow) (*model.WorkflowRecord, error)
ListWorkflowRecords(ctx context.Context, workflow *model.Workflow, page, pageSize int) (*apisv1.ListWorkflowRecordsResponse, error)
DetailWorkflowRecord(ctx context.Context, workflow *model.Workflow, recordName string) (*apisv1.DetailWorkflowRecordResponse, error)
SyncWorkflowRecord(ctx context.Context) error
SyncWorkflowRecord(ctx context.Context, appKey, recordName string, app *v1beta1.Application, workflowContext map[string]string) error
ResumeRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, stepName string) error
TerminateRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName string) error
RollbackRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, revisionName string) (*apisv1.WorkflowRecordBase, error)
Expand Down Expand Up @@ -353,135 +352,10 @@ func (w *workflowServiceImpl) DetailWorkflowRecord(ctx context.Context, workflow
}, nil
}

func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context) error {
var record = model.WorkflowRecord{
Finished: "false",
func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context, appPrimaryKey, recordName string, app *v1beta1.Application, workflowContext map[string]string) error {
if app == nil || app.Annotations == nil || app.Status.Workflow == nil {
return nil
}
// list all unfinished workflow records
records, err := w.Store.List(ctx, &record, &datastore.ListOptions{})
if err != nil {
return err
}

for _, item := range records {
app := &v1beta1.Application{}
record := item.(*model.WorkflowRecord)
workflow := &model.Workflow{
Name: record.WorkflowName,
AppPrimaryKey: record.AppPrimaryKey,
}
if err := w.Store.Get(ctx, workflow); err != nil {
klog.ErrorS(err, "failed to get workflow", "app name", record.AppPrimaryKey, "workflow name", record.WorkflowName, "record name", record.Name)
continue
}
envbinding, err := w.EnvBindingService.GetEnvBinding(ctx, &model.Application{Name: record.AppPrimaryKey}, workflow.EnvName)
if err != nil {
klog.ErrorS(err, "failed to get envbinding", "app name", record.AppPrimaryKey, "workflow name", record.WorkflowName, "record name", record.Name)
}
var appName string
if envbinding != nil {
appName = envbinding.AppDeployName
}
if appName == "" {
appName = record.AppPrimaryKey
}
if err := w.KubeClient.Get(ctx, types.NamespacedName{
Name: appName,
Namespace: record.Namespace,
}, app); err != nil {
if apierrors.IsNotFound(err) {
klog.Warningf("can't find the application %s/%s, set the record status to terminated", appName, record.Namespace)
if err := w.setRecordToTerminated(ctx, record.AppPrimaryKey, record.Name); err != nil {
klog.Errorf("failed to set the record status to terminated %s", err.Error())
}
continue
}
klog.ErrorS(err, "failed to get app", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name)
continue
}

if app.Status.Workflow == nil {
continue
}

// This means the application workflow has not run.
if app.Generation > app.Status.ObservedGeneration {
continue
}

// there is a ":" in the default app revision
recordName := strings.Replace(app.Status.Workflow.AppRevision, ":", "-", 1)

// try to sync the status from the running application
if app.Annotations != nil && app.Status.Workflow != nil && recordName == record.Name {
if err := w.syncWorkflowStatus(ctx, record.AppPrimaryKey, app, record.Name, app.Name, nil); err != nil {
klog.ErrorS(err, "failed to sync workflow status", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name)
}
}

if record.Name == oam.GetPublishVersion(app) {
continue
}

// try to sync the status from the application revision
var revision = &model.ApplicationRevision{AppPrimaryKey: record.AppPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
// If the application revision is not exist, the record do not need be synced
var record = &model.WorkflowRecord{
AppPrimaryKey: record.AppPrimaryKey,
Name: recordName,
}
if err := w.Store.Get(ctx, record); err == nil {
record.Finished = "true"
record.Status = model.RevisionStatusFail
err := w.Store.Put(ctx, record)
if err != nil {
klog.Errorf("failed to set the workflow status is failure %s", err.Error())
}
continue
}
}
klog.Errorf("failed to get the application revision from database %s", err.Error())
continue
}

var appRevision v1beta1.ApplicationRevision
if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: app.Namespace, Name: revision.RevisionCRName}, &appRevision); err != nil {
if apierrors.IsNotFound(err) {
klog.Warningf("can't find the application revision %s/%s, set the record status to terminated", revision.RevisionCRName, app.Namespace)
if err := w.setRecordToTerminated(ctx, record.AppPrimaryKey, record.Name); err != nil {
klog.Errorf("failed to set the record status to terminated %s", err.Error())
}
continue
}
klog.Warningf("failed to get the application revision %s", err.Error())
continue
}

if appRevision.Status.Workflow != nil {
appRevision.Spec.Application.Status.Workflow = appRevision.Status.Workflow
if !appRevision.Spec.Application.Status.Workflow.Finished {
appRevision.Spec.Application.Status.Workflow.Finished = true
appRevision.Spec.Application.Status.Workflow.Terminated = true
}
}
if err := w.syncWorkflowStatus(ctx,
record.AppPrimaryKey,
&appRevision.Spec.Application,
record.Name,
appRevision.Name,
appRevision.Status.WorkflowContext,
); err != nil {
klog.ErrorS(err, "failed to sync workflow status", "oam app name", appName, "workflow name", record.WorkflowName, "record name", record.Name)
continue
}
}

return nil
}

func (w *workflowServiceImpl) setRecordToTerminated(ctx context.Context, appPrimaryKey, recordName string) error {
var record = &model.WorkflowRecord{
AppPrimaryKey: appPrimaryKey,
Name: recordName,
Expand All @@ -499,114 +373,76 @@ func (w *workflowServiceImpl) setRecordToTerminated(ctx context.Context, appPrim
}
return err
}
record.Status = model.RevisionStatusTerminated
record.Finished = "true"

revision.Status = model.RevisionStatusTerminated

if err := w.Store.Put(ctx, record); err != nil {
return err
if workflowContext != nil {
record.ContextValue = workflowContext
}

if err := w.Store.Put(ctx, revision); err != nil {
return err
}
return nil
}
status := app.Status.Workflow
record.Status = string(status.Phase)
record.Message = status.Message
record.Mode = status.Mode

func (w *workflowServiceImpl) syncWorkflowStatus(ctx context.Context,
appPrimaryKey string,
app *v1beta1.Application,
recordName,
source string,
workflowContext map[string]string) error {
var record = &model.WorkflowRecord{
AppPrimaryKey: appPrimaryKey,
Name: recordName,
}
if err := w.Store.Get(ctx, record); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
return bcode.ErrWorkflowRecordNotExist
}
return err
}
var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
return bcode.ErrApplicationRevisionNotExist
if cb := app.Status.Workflow.ContextBackend; cb != nil && workflowContext == nil && cb.Namespace != "" && cb.Name != "" {
var cm corev1.ConfigMap
if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: cb.Namespace, Name: cb.Name}, &cm); err != nil {
klog.Errorf("failed to load the context values of the application %s:%s", app.Name, err.Error())
}
return err
record.ContextValue = cm.Data
}

if workflowContext != nil {
record.ContextValue = workflowContext
}

if app.Status.Workflow != nil {
if app.Status.Workflow.AppRevision != record.Name {
klog.Warningf("the app(%s) revision is not match the record(%s), try next time..", app.Name, record.Name)
return nil
}
status := app.Status.Workflow
record.Status = string(status.Phase)
record.Message = status.Message
record.Mode = status.Mode

if cb := app.Status.Workflow.ContextBackend; cb != nil && workflowContext == nil && cb.Namespace != "" && cb.Name != "" {
var cm corev1.ConfigMap
if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: cb.Namespace, Name: cb.Name}, &cm); err != nil {
klog.Errorf("failed to load the context values of the application %s:%s", app.Name, err.Error())
}
record.ContextValue = cm.Data
stepStatus := make(map[string]*model.WorkflowStepStatus, len(status.Steps))
stepAlias := make(map[string]string)
for _, step := range record.Steps {
stepAlias[step.Name] = step.Alias
for _, sub := range step.SubStepsStatus {
stepAlias[sub.Name] = sub.Alias
}

stepStatus := make(map[string]*model.WorkflowStepStatus, len(status.Steps))
stepAlias := make(map[string]string)
for _, step := range record.Steps {
stepAlias[step.Name] = step.Alias
for _, sub := range step.SubStepsStatus {
stepAlias[sub.Name] = sub.Alias
}
}
for _, step := range status.Steps {
stepStatus[step.Name] = &model.WorkflowStepStatus{
StepStatus: convert.FromCRWorkflowStepStatus(step.StepStatus, stepAlias[step.Name]),
SubStepsStatus: make([]model.StepStatus, 0),
}
for _, step := range status.Steps {
stepStatus[step.Name] = &model.WorkflowStepStatus{
StepStatus: convert.FromCRWorkflowStepStatus(step.StepStatus, stepAlias[step.Name]),
SubStepsStatus: make([]model.StepStatus, 0),
}
for _, sub := range step.SubStepsStatus {
stepStatus[step.Name].SubStepsStatus = append(stepStatus[step.Name].SubStepsStatus, convert.FromCRWorkflowStepStatus(sub, stepAlias[sub.Name]))
}
for _, sub := range step.SubStepsStatus {
stepStatus[step.Name].SubStepsStatus = append(stepStatus[step.Name].SubStepsStatus, convert.FromCRWorkflowStepStatus(sub, stepAlias[sub.Name]))
}
for i, step := range record.Steps {
if stepStatus[step.Name] != nil {
record.Steps[i] = *stepStatus[step.Name]
}
}
for i, step := range record.Steps {
if stepStatus[step.Name] != nil {
record.Steps[i] = *stepStatus[step.Name]
} else {
record.Steps[i].Phase = ""
record.Steps[i].Message = ""
record.Steps[i].Reason = ""
record.Steps[i].FirstExecuteTime = time.Time{}
record.Steps[i].LastExecuteTime = time.Time{}
}
}

// the auto generated workflow steps should be sync
if (len(record.Steps) == 0) && len(status.Steps) > 0 {
for k := range stepStatus {
record.Steps = append(record.Steps, *stepStatus[k])
}
// the auto generated workflow steps should be sync
if (len(record.Steps) == 0) && len(status.Steps) > 0 {
for k := range stepStatus {
record.Steps = append(record.Steps, *stepStatus[k])
}
}

record.Finished = strconv.FormatBool(status.Finished)
record.EndTime = status.EndTime.Time
if err := w.Store.Put(ctx, record); err != nil {
return err
}
record.Finished = strconv.FormatBool(status.Finished)
record.EndTime = status.EndTime.Time
if err := w.Store.Put(ctx, record); err != nil {
return err
}

revision.Status = generateRevisionStatus(status.Phase)
if app.Status.LatestRevision != nil {
revision.RevisionCRName = app.Status.LatestRevision.Name
}
if err := w.Store.Put(ctx, revision); err != nil {
return err
}
revision.Status = generateRevisionStatus(status.Phase)
if app.Status.LatestRevision != nil {
revision.RevisionCRName = app.Status.LatestRevision.Name
}
if err := w.Store.Put(ctx, revision); err != nil {
return err
}

if record.Finished == "true" {
klog.InfoS("successfully sync workflow status", "oam app name", app.Name, "workflow name", record.WorkflowName, "record name", record.Name, "status", record.Status, "sync source", source)
klog.InfoS("successfully sync workflow status", "oam app name", app.Name, "workflow name", record.WorkflowName, "record name", record.Name, "status", record.Status, "sync source", app.Name)
}

return nil
Expand Down Expand Up @@ -662,7 +498,7 @@ func (w *workflowServiceImpl) CreateWorkflowRecord(ctx context.Context, appModel
Name: app.Annotations[oam.AnnotationPublishVersion],
Namespace: app.Namespace,
Finished: "false",
StartTime: time.Now().Time,
StartTime: time.Now(),
Steps: steps,
Status: string(workflowv1alpha1.WorkflowStateInitializing),
}
Expand Down Expand Up @@ -699,6 +535,7 @@ func resetRevisionsAndRecords(ctx context.Context, ds datastore.DataStore, appNa
if err := ds.Put(ctx, revision); err != nil {
klog.Info("failed to set rest revisions' status to terminate", "app name", appName, "revision version", revision.Version, "error", err)
}
klog.Info("set rest revisions' status to terminate", "app name", appName, "revision version", revision.Version)
}
}

Expand Down Expand Up @@ -729,6 +566,7 @@ func resetRevisionsAndRecords(ctx context.Context, ds datastore.DataStore, appNa
if err := ds.Put(ctx, record); err != nil {
klog.Info("failed to set rest records' status to terminate", "app name", appName, "workflow name", record.WorkflowName, "record name", record.Name, "error", err)
}
klog.Info("set rest records' status to terminate", "app name", appName, "workflow name", record.WorkflowName, "record name", record.Name)
}
}

Expand Down Expand Up @@ -768,7 +606,7 @@ func (w *workflowServiceImpl) ResumeRecord(ctx context.Context, appModel *model.
return err
}

if err := w.syncWorkflowStatus(ctx, appModel.PrimaryKey(), oamApp, recordName, oamApp.Name, nil); err != nil {
if err := w.SyncWorkflowRecord(ctx, appModel.PrimaryKey(), recordName, oamApp, nil); err != nil {
return err
}

Expand All @@ -783,7 +621,7 @@ func (w *workflowServiceImpl) TerminateRecord(ctx context.Context, appModel *mod
if err := operation.TerminateWorkflow(ctx, w.KubeClient, oamApp); err != nil {
return err
}
if err := w.syncWorkflowStatus(ctx, appModel.PrimaryKey(), oamApp, recordName, oamApp.Name, nil); err != nil {
if err := w.SyncWorkflowRecord(ctx, appModel.PrimaryKey(), recordName, oamApp, nil); err != nil {
return err
}

Expand Down Expand Up @@ -846,7 +684,7 @@ func (w *workflowServiceImpl) RollbackRecord(ctx context.Context, appModel *mode
// update the original revision status to rollback
originalRevision.Status = model.RevisionStatusRollback
originalRevision.RollbackVersion = revisionVersion
originalRevision.UpdateTime = time.Now().Time
originalRevision.UpdateTime = time.Now()
if err := w.Store.Put(ctx, originalRevision); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a59193b

Please sign in to comment.