Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cmd/argocd/commands/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewApplicationCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comman
command.AddCommand(NewApplicationDeleteCommand(clientOpts))
command.AddCommand(NewApplicationWaitCommand(clientOpts))
command.AddCommand(NewApplicationManifestsCommand(clientOpts))
command.AddCommand(NewApplicationTerminateOpCommand(clientOpts))
return command
}

Expand Down Expand Up @@ -1122,3 +1123,25 @@ func NewApplicationManifestsCommand(clientOpts *argocdclient.ClientOptions) *cob
command.Flags().StringVar(&revision, "revision", "", "Show manifests at a specific revision")
return command
}

// NewApplicationTerminateOpCommand returns a new instance of an `argocd app terminate-op` command
func NewApplicationTerminateOpCommand(clientOpts *argocdclient.ClientOptions) *cobra.Command {
var command = &cobra.Command{
Use: "terminate-op APPNAME",
Short: "Terminate running operation of an application",
Run: func(c *cobra.Command, args []string) {
if len(args) != 1 {
c.HelpFunc()(c, args)
os.Exit(1)
}
appName := args[0]
conn, appIf := argocdclient.NewClientOrDie(clientOpts).NewApplicationClientOrDie()
defer util.Close(conn)
ctx := context.Background()
_, err := appIf.TerminateOperation(ctx, &application.OperationTerminateRequest{Name: &appName})
errors.CheckError(err)
fmt.Printf("Application '%s' operation terminating\n", appName)
},
}
return command
}
74 changes: 43 additions & 31 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,21 +327,20 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
} else {
state.Message = fmt.Sprintf("%v", r)
}
ctrl.setOperationState(app, state, app.Operation)
ctrl.setOperationState(app, state)
}
}()
if isOperationRunning(app) {
// If we get here, we are about process an operation but we notice it is already Running.
// We need to detect if the controller crashed before completing the operation, or if the
// the app object we pulled off the informer is simply stale and doesn't reflect the fact
// that the operation is completed. We don't want to perform the operation again. To detect
// this, always retrieve the latest version to ensure it is not stale.
if isOperationInProgress(app) {
// If we get here, we are about process an operation but we notice it is already in progress.
// We need to detect if the app object we pulled off the informer is stale and doesn't
// reflect the fact that the operation is completed. We don't want to perform the operation
// again. To detect this, always retrieve the latest version to ensure it is not stale.
freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace).Get(app.ObjectMeta.Name, metav1.GetOptions{})
if err != nil {
log.Errorf("Failed to retrieve latest application state: %v", err)
return
}
if !isOperationRunning(freshApp) {
if !isOperationInProgress(freshApp) {
log.Infof("Skipping operation on stale application state (%s)", app.ObjectMeta.Name)
return
}
Expand All @@ -350,51 +349,64 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
log.Infof("Resuming in-progress operation. app: %s, phase: %s, message: %s", app.ObjectMeta.Name, state.Phase, state.Message)
} else {
state = &appv1.OperationState{Phase: appv1.OperationRunning, Operation: *app.Operation, StartedAt: metav1.Now()}
ctrl.setOperationState(app, state, app.Operation)
ctrl.setOperationState(app, state)
log.Infof("Initialized new operation. app: %s, operation: %v", app.ObjectMeta.Name, *app.Operation)
}
ctrl.appStateManager.SyncAppState(app, state)
ctrl.setOperationState(app, state, app.Operation)

if state.Phase == appv1.OperationRunning {
// It's possible for an app to be terminated while we were operating on it. We do not want
// to clobber the Terminated state with Running. Get the latest app state to check for this.
freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace).Get(app.ObjectMeta.Name, metav1.GetOptions{})
if err == nil {
if freshApp.Status.OperationState != nil && freshApp.Status.OperationState.Phase == appv1.OperationTerminating {
state.Phase = appv1.OperationTerminating
state.Message = "operation is terminating"
// after this, we will get requeued to the workqueue, but next time the
// SyncAppState will operate in a Terminating phase, allowing the worker to perform
// cleanup (e.g. delete jobs, workflows, etc...)
}
}
}

ctrl.setOperationState(app, state)
if state.Phase.Completed() {
// if we just completed an operation, force a refresh so that UI will report up-to-date
// sync/health information
ctrl.forceAppRefresh(app.ObjectMeta.Name)
}
}

func (ctrl *ApplicationController) setOperationState(app *appv1.Application, state *appv1.OperationState, operation *appv1.Operation) {
func (ctrl *ApplicationController) setOperationState(app *appv1.Application, state *appv1.OperationState) {
retryUntilSucceed(func() error {
var inProgressOpValue *appv1.Operation
if state.Phase == "" {
// expose any bugs where we neglect to set phase
panic("no phase was set")
}
if !state.Phase.Completed() {
// If operation is still running, we populate the app.operation field, which prevents
// any other operation from running at the same time. Otherwise, it is cleared by setting
// it to nil which indicates no operation is in progress.
inProgressOpValue = operation
} else {
nowTime := metav1.Now()
state.FinishedAt = &nowTime
if state.Phase.Completed() {
now := metav1.Now()
state.FinishedAt = &now
}

if reflect.DeepEqual(app.Operation, inProgressOpValue) && reflect.DeepEqual(app.Status.OperationState, state) {
log.Infof("No operation updates necessary to '%s'. Skipping patch", app.Name)
return nil
}

patch, err := json.Marshal(map[string]interface{}{
patch := map[string]interface{}{
"status": map[string]interface{}{
"operationState": state,
},
"operation": inProgressOpValue,
})
}
if state.Phase.Completed() {
// If operation is completed, clear the operation field to indicate no operation is
// in progress.
patch["operation"] = nil
}
if reflect.DeepEqual(app.Status.OperationState, state) {
log.Infof("No operation updates necessary to '%s'. Skipping patch", app.Name)
return nil
}
patchJSON, err := json.Marshal(patch)
if err != nil {
return err
}
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace)
_, err = appClient.Patch(app.Name, types.MergePatchType, patch)
_, err = appClient.Patch(app.Name, types.MergePatchType, patchJSON)
if err != nil {
return err
}
Expand Down Expand Up @@ -679,6 +691,6 @@ func newApplicationInformer(
return informer
}

func isOperationRunning(app *appv1.Application) bool {
func isOperationInProgress(app *appv1.Application) bool {
return app.Status.OperationState != nil && !app.Status.OperationState.Phase.Completed()
}
77 changes: 68 additions & 9 deletions controller/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -153,7 +154,12 @@ func (s *ksonnetAppStateManager) SyncAppState(app *appv1.Application, state *app
log: log.WithFields(log.Fields{"application": app.Name}),
}

syncCtx.sync()
if state.Phase == appv1.OperationTerminating {
syncCtx.terminate()
} else {
syncCtx.sync()
}

if !syncOp.DryRun && syncCtx.opState.Phase.Successful() {
err := s.persistDeploymentInfo(app, manifestInfo.Revision, manifestInfo.Params, nil)
if err != nil {
Expand Down Expand Up @@ -561,7 +567,8 @@ func (sc *syncContext) runHook(hook *unstructured.Unstructured, hookType appv1.H
} else {
liveObj = existing
}
return sc.updateHookStatus(liveObj, hookType), nil
hookStatus := newHookStatus(liveObj, hookType)
return sc.updateHookStatus(hookStatus), nil
}

// isHookType tells whether or not the supplied object is a hook of the specified type
Expand Down Expand Up @@ -648,10 +655,7 @@ func (sc *syncContext) getHookStatus(hookObj *unstructured.Unstructured, hookTyp
return nil
}

// updateHookStatus updates the status of a hook. Returns whether or not the hook was changed or not
func (sc *syncContext) updateHookStatus(hook *unstructured.Unstructured, hookType appv1.HookType) bool {
sc.lock.Lock()
defer sc.lock.Unlock()
func newHookStatus(hook *unstructured.Unstructured, hookType appv1.HookType) appv1.HookStatus {
hookStatus := appv1.HookStatus{
Name: hook.GetName(),
Kind: hook.GetKind(),
Expand Down Expand Up @@ -714,17 +718,23 @@ func (sc *syncContext) updateHookStatus(hook *unstructured.Unstructured, hookTyp
hookStatus.Status = appv1.OperationSucceeded
hookStatus.Message = fmt.Sprintf("%s created", hook.GetName())
}
return hookStatus
}

// updateHookStatus updates the status of a hook. Returns whether or not the hook was changed or not
func (sc *syncContext) updateHookStatus(hookStatus appv1.HookStatus) bool {
sc.lock.Lock()
defer sc.lock.Unlock()
for i, prev := range sc.syncRes.Hooks {
if prev.Name == hookStatus.Name && prev.Kind == hookStatus.Kind && prev.Type == hookType {
if prev.Name == hookStatus.Name && prev.Kind == hookStatus.Kind && prev.Type == hookStatus.Type {
if reflect.DeepEqual(prev, hookStatus) {
return false
}
if prev.Status != hookStatus.Status {
sc.log.Infof("Hook %s %s/%s status: %s -> %s", hookType, prev.Kind, prev.Name, prev.Status, hookStatus.Status)
sc.log.Infof("Hook %s %s/%s status: %s -> %s", hookStatus.Type, prev.Kind, prev.Name, prev.Status, hookStatus.Status)
}
if prev.Message != hookStatus.Message {
sc.log.Infof("Hook %s %s/%s message: '%s' -> '%s'", hookType, prev.Kind, prev.Name, prev.Message, hookStatus.Message)
sc.log.Infof("Hook %s %s/%s message: '%s' -> '%s'", hookStatus.Type, prev.Kind, prev.Name, prev.Message, hookStatus.Message)
}
sc.syncRes.Hooks[i] = &hookStatus
return true
Expand All @@ -751,3 +761,52 @@ func areHooksCompletedSuccessful(hookType appv1.HookType, hookStatuses []*appv1.
}
return true, isSuccessful
}

// terminate looks for any running jobs/workflow hooks and deletes the resource
func (sc *syncContext) terminate() {
terminateSuccessful := true
for _, hookStatus := range sc.syncRes.Hooks {
if hookStatus.Status.Completed() {
continue
}
switch hookStatus.Kind {
case "Job", "Workflow":
hookStatus.Status = appv1.OperationFailed
err := sc.deleteHook(hookStatus.Name, hookStatus.Kind, hookStatus.APIVersion)
if err != nil {
hookStatus.Message = fmt.Sprintf("Failed to delete %s hook %s/%s: %v", hookStatus.Type, hookStatus.Kind, hookStatus.Name, err)
terminateSuccessful = false
} else {
hookStatus.Message = fmt.Sprintf("Deleted %s hook %s/%s", hookStatus.Type, hookStatus.Kind, hookStatus.Name)
}
sc.updateHookStatus(*hookStatus)
}
}
if terminateSuccessful {
sc.setOperationPhase(appv1.OperationFailed, "Application terminated")
} else {
sc.setOperationPhase(appv1.OperationError, "Termination had errors")
}
}

func (sc *syncContext) deleteHook(name, kind, apiVersion string) error {
groupVersion := strings.Split(apiVersion, "/")
if len(groupVersion) != 2 {
return fmt.Errorf("Failed to terminate app. Unrecognized group/version: %s", apiVersion)
}
gvk := schema.GroupVersionKind{
Group: groupVersion[0],
Version: groupVersion[1],
Kind: kind,
}
dclient, err := sc.dynClientPool.ClientForGroupVersionKind(gvk)
if err != nil {
return err
}
apiResource, err := kube.ServerResourceForGroupVersionKind(sc.disco, gvk)
if err != nil {
return err
}
resIf := dclient.Resource(apiResource, sc.namespace)
return resIf.Delete(name, &metav1.DeleteOptions{})
}
4 changes: 2 additions & 2 deletions pkg/apis/application/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/apis/application/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type Operation struct {
type OperationPhase string

const (
OperationRunning OperationPhase = "Running"
OperationFailed OperationPhase = "Failed"
OperationError OperationPhase = "Error"
OperationSucceeded OperationPhase = "Succeeded"
OperationRunning OperationPhase = "Running"
OperationTerminating OperationPhase = "Terminating"
OperationFailed OperationPhase = "Failed"
OperationError OperationPhase = "Error"
OperationSucceeded OperationPhase = "Succeeded"
)

func (os OperationPhase) Completed() bool {
Expand Down
31 changes: 31 additions & 0 deletions server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,34 @@ func (s *Server) setAppOperation(ctx context.Context, appName string, operationC
}
}
}

func (s *Server) TerminateOperation(ctx context.Context, termOpReq *OperationTerminateRequest) (*OperationTerminateResponse, error) {
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*termOpReq.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if !s.enf.EnforceClaims(ctx.Value("claims"), "applications", "terminateop", appRBACName(*a)) {
return nil, grpc.ErrPermissionDenied
}

for i := 0; i < 10; i++ {
if a.Operation == nil || a.Status.OperationState == nil {
return nil, status.Errorf(codes.InvalidArgument, "Unable to terminate operation. No operation is in progress")
}
a.Status.OperationState.Phase = appv1.OperationTerminating
_, err = s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Update(a)
if err == nil {
return &OperationTerminateResponse{}, nil
}
if !apierr.IsConflict(err) {
return nil, err
}
log.Warnf("Failed to set operation for app '%s' due to update conflict. Retrying again...", termOpReq.Name)
time.Sleep(100 * time.Millisecond)
a, err = s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*termOpReq.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
}
return nil, status.Errorf(codes.Internal, "Failed to terminate app. Too many conflicts")
}
Loading