diff --git a/services/rsources/http/http.go b/services/rsources/http/http.go index d1548fb24e..328a1aeb03 100644 --- a/services/rsources/http/http.go +++ b/services/rsources/http/http.go @@ -105,6 +105,9 @@ func (h *handler) getStatus(w http.ResponseWriter, r *http.Request) { } return } + + jobStatus.FixCorruptedStats(h.logger) + w.Header().Set("Content-Type", "application/json") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(http.StatusOK) diff --git a/services/rsources/rsources.go b/services/rsources/rsources.go index af815f94c7..525c7a049d 100644 --- a/services/rsources/rsources.go +++ b/services/rsources/rsources.go @@ -34,15 +34,60 @@ type Stats struct { Failed uint `json:"failed"` } -func (r Stats) completed() bool { +func (r *Stats) completed() bool { return r.In == r.Out+r.Failed } +func (r *Stats) corrupted() bool { + return r.In < r.Out+r.Failed +} + +func (r *Stats) fixCorrupted() { + if r.corrupted() { + r.In = r.Out + r.Failed + } +} + type JobStatus struct { ID string `json:"id"` TasksStatus []TaskStatus `json:"tasks"` } +func (js *JobStatus) FixCorruptedStats(log logger.Logger) { + isCorrupted := func() bool { + for ti := range js.TasksStatus { + for si := range js.TasksStatus[ti].SourcesStatus { + if js.TasksStatus[ti].SourcesStatus[si].Stats.corrupted() { + return true + } + for di := range js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus { + if js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Stats.corrupted() { + return true + } + } + } + } + return false + } + fixCorrupted := func() { + for ti := range js.TasksStatus { + for si := range js.TasksStatus[ti].SourcesStatus { + js.TasksStatus[ti].SourcesStatus[si].Stats.fixCorrupted() + js.TasksStatus[ti].SourcesStatus[si].Completed = js.TasksStatus[ti].SourcesStatus[si].Stats.completed() + for di := range js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus { + js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Stats.fixCorrupted() + js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Completed = js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Stats.completed() + } + } + } + } + if isCorrupted() { + corruptedJson, _ := json.Marshal(js) + log.Warnw("Corrupted job status stats detected, fixing", "job_status", string(corruptedJson)) + fixCorrupted() + } +} + type TaskStatus struct { ID string `json:"id"` SourcesStatus []SourceStatus `json:"sources"` diff --git a/services/rsources/rsources_test.go b/services/rsources/rsources_test.go new file mode 100644 index 0000000000..0a0c385846 --- /dev/null +++ b/services/rsources/rsources_test.go @@ -0,0 +1,190 @@ +package rsources_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-server/services/rsources" +) + +func TestCorruptedJobStatus(t *testing.T) { + t.Run("Corrupted Source Stats", func(t *testing.T) { + js := rsources.JobStatus{ + ID: "job-id", + TasksStatus: []rsources.TaskStatus{ + { + ID: "task-id", + SourcesStatus: []rsources.SourceStatus{ + { + ID: "source-id", + Completed: false, + Stats: rsources.Stats{ + In: 2, + Out: 3, + Failed: 0, + }, + DestinationsStatus: []rsources.DestinationStatus{ + { + ID: "destination-id", + Completed: false, + Stats: rsources.Stats{ + In: 3, + Out: 2, + Failed: 1, + }, + }, + }, + }, + }, + }, + }, + } + + js.FixCorruptedStats(logger.NOP) + require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].Completed) + require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].Stats.In) + require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].Stats.Out) + require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed) + + require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed) + require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out) + require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed) + }) + + t.Run("Corrupted Destination Stats", func(t *testing.T) { + js := rsources.JobStatus{ + ID: "job-id", + TasksStatus: []rsources.TaskStatus{ + { + ID: "task-id", + SourcesStatus: []rsources.SourceStatus{ + { + ID: "source-id", + Completed: true, + Stats: rsources.Stats{ + In: 2, + Out: 2, + Failed: 0, + }, + DestinationsStatus: []rsources.DestinationStatus{ + { + ID: "destination-id", + Completed: false, + Stats: rsources.Stats{ + In: 2, + Out: 2, + Failed: 1, + }, + }, + }, + }, + }, + }, + }, + } + + js.FixCorruptedStats(logger.NOP) + + require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].Completed) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.In) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.Out) + require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed) + + require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed) + require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out) + require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed) + }) + + t.Run("Corrupted Source & Destination Stats", func(t *testing.T) { + js := rsources.JobStatus{ + ID: "job-id", + TasksStatus: []rsources.TaskStatus{ + { + ID: "task-id", + SourcesStatus: []rsources.SourceStatus{ + { + ID: "source-id", + Completed: false, + Stats: rsources.Stats{ + In: 1, + Out: 2, + Failed: 0, + }, + DestinationsStatus: []rsources.DestinationStatus{ + { + ID: "destination-id", + Completed: false, + Stats: rsources.Stats{ + In: 2, + Out: 2, + Failed: 1, + }, + }, + }, + }, + }, + }, + }, + } + + js.FixCorruptedStats(logger.NOP) + require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].Completed) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.In) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.Out) + require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed) + + require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed) + require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out) + require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed) + }) + + t.Run("Nothing is corrupted", func(t *testing.T) { + js := rsources.JobStatus{ + ID: "job-id", + TasksStatus: []rsources.TaskStatus{ + { + ID: "task-id", + SourcesStatus: []rsources.SourceStatus{ + { + ID: "source-id", + Completed: false, + Stats: rsources.Stats{ + In: 3, + Out: 2, + Failed: 0, + }, + DestinationsStatus: []rsources.DestinationStatus{ + { + ID: "destination-id", + Completed: false, + Stats: rsources.Stats{ + In: 2, + Out: 1, + Failed: 0, + }, + }, + }, + }, + }, + }, + }, + } + + js.FixCorruptedStats(logger.NOP) + + require.EqualValues(t, false, js.TasksStatus[0].SourcesStatus[0].Completed) + require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].Stats.In) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.Out) + require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed) + + require.EqualValues(t, false, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed) + require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In) + require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out) + require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed) + }) +}