Skip to content

Commit

Permalink
chore: autofix corrupted job-status stats
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Dec 11, 2023
1 parent cfe458a commit eec2f8c
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 1 deletion.
3 changes: 3 additions & 0 deletions services/rsources/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func (h *handler) getStatus(w http.ResponseWriter, r *http.Request) {
}
return
}

jobStatus.FixCorruptedStats(h.logger)

w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-store")
w.WriteHeader(http.StatusOK)
Expand Down
48 changes: 47 additions & 1 deletion services/rsources/rsources.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,61 @@ type Stats struct {
Failed uint `json:"failed"`
}

func (r Stats) completed() bool {
func (r *Stats) completed() bool {
return r.In == r.Out+r.Failed
}

func (r *Stats) corrupted() bool {
return r.In < r.Out+r.Failed
}

func (r *Stats) fixCorrupted() {
if r.corrupted() {
r.In = r.Out + r.Failed
}
}

type JobStatus struct {
ID string `json:"id"`
TasksStatus []TaskStatus `json:"tasks"`
}

func (js *JobStatus) FixCorruptedStats(log logger.Logger) {
isCorrupted := func() bool {
for ti := range js.TasksStatus {
for si := range js.TasksStatus[ti].SourcesStatus {
if js.TasksStatus[ti].SourcesStatus[si].Stats.corrupted() {
return true
}
for di := range js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus {
if js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Stats.corrupted() {
return true
}
}
}
}
return false
}
fixCorrupted := func() {
for ti := range js.TasksStatus {
for si := range js.TasksStatus[ti].SourcesStatus {
js.TasksStatus[ti].SourcesStatus[si].Stats.fixCorrupted()
js.TasksStatus[ti].SourcesStatus[si].Completed = js.TasksStatus[ti].SourcesStatus[si].Stats.completed()
for di := range js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus {
js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Stats.fixCorrupted()
js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Completed = js.TasksStatus[ti].SourcesStatus[si].DestinationsStatus[di].Stats.completed()
}
}
}
}
if isCorrupted() {
corruptedJson, _ := json.Marshal(js)
log.Warnw("Corrupted job status stats detected, fixing", "job_status", string(corruptedJson))
fixCorrupted()
}

}

type TaskStatus struct {
ID string `json:"id"`
SourcesStatus []SourceStatus `json:"sources"`
Expand Down
191 changes: 191 additions & 0 deletions services/rsources/rsources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package rsources_test

import (
"testing"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/stretchr/testify/require"
)

func TestCorruptedJobStatus(t *testing.T) {

t.Run("Corrupted Source Stats", func(t *testing.T) {
js := rsources.JobStatus{
ID: "job-id",
TasksStatus: []rsources.TaskStatus{
{
ID: "task-id",
SourcesStatus: []rsources.SourceStatus{
{
ID: "source-id",
Completed: false,
Stats: rsources.Stats{
In: 2,
Out: 3,
Failed: 0,
},
DestinationsStatus: []rsources.DestinationStatus{
{
ID: "destination-id",
Completed: false,
Stats: rsources.Stats{
In: 3,
Out: 2,
Failed: 1,
},
},
},
},
},
},
},
}

js.FixCorruptedStats(logger.NOP)
require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].Completed)
require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].Stats.In)
require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].Stats.Out)
require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed)

require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed)
require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out)
require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed)
})

t.Run("Corrupted Destination Stats", func(t *testing.T) {
js := rsources.JobStatus{
ID: "job-id",
TasksStatus: []rsources.TaskStatus{
{
ID: "task-id",
SourcesStatus: []rsources.SourceStatus{
{
ID: "source-id",
Completed: true,
Stats: rsources.Stats{
In: 2,
Out: 2,
Failed: 0,
},
DestinationsStatus: []rsources.DestinationStatus{
{
ID: "destination-id",
Completed: false,
Stats: rsources.Stats{
In: 2,
Out: 2,
Failed: 1,
},
},
},
},
},
},
},
}

js.FixCorruptedStats(logger.NOP)

require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].Completed)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.In)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.Out)
require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed)

require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed)
require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out)
require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed)
})

t.Run("Corrupted Source & Destination Stats", func(t *testing.T) {
js := rsources.JobStatus{
ID: "job-id",
TasksStatus: []rsources.TaskStatus{
{
ID: "task-id",
SourcesStatus: []rsources.SourceStatus{
{
ID: "source-id",
Completed: false,
Stats: rsources.Stats{
In: 1,
Out: 2,
Failed: 0,
},
DestinationsStatus: []rsources.DestinationStatus{
{
ID: "destination-id",
Completed: false,
Stats: rsources.Stats{
In: 2,
Out: 2,
Failed: 1,
},
},
},
},
},
},
},
}

js.FixCorruptedStats(logger.NOP)
require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].Completed)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.In)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.Out)
require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed)

require.EqualValues(t, true, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed)
require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out)
require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed)
})

t.Run("Nothing is corrupted", func(t *testing.T) {
js := rsources.JobStatus{
ID: "job-id",
TasksStatus: []rsources.TaskStatus{
{
ID: "task-id",
SourcesStatus: []rsources.SourceStatus{
{
ID: "source-id",
Completed: false,
Stats: rsources.Stats{
In: 3,
Out: 2,
Failed: 0,
},
DestinationsStatus: []rsources.DestinationStatus{
{
ID: "destination-id",
Completed: false,
Stats: rsources.Stats{
In: 2,
Out: 1,
Failed: 0,
},
},
},
},
},
},
},
}

js.FixCorruptedStats(logger.NOP)

require.EqualValues(t, false, js.TasksStatus[0].SourcesStatus[0].Completed)
require.EqualValues(t, 3, js.TasksStatus[0].SourcesStatus[0].Stats.In)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].Stats.Out)
require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].Stats.Failed)

require.EqualValues(t, false, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Completed)
require.EqualValues(t, 2, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.In)
require.EqualValues(t, 1, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Out)
require.EqualValues(t, 0, js.TasksStatus[0].SourcesStatus[0].DestinationsStatus[0].Stats.Failed)
})

}

0 comments on commit eec2f8c

Please sign in to comment.