Skip to content

Commit

Permalink
Merge pull request #75 from fieldryand/fix-leaking-goroutines
Browse files Browse the repository at this point in the history
Fix leaking goroutines
  • Loading branch information
fieldryand authored Feb 24, 2024
2 parents cc280ec + e71bbbd commit cded453
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 43 deletions.
24 changes: 14 additions & 10 deletions operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ func (o Get) Run() (interface{}, error) {
res, err := o.Client.Get(o.URL)
if err != nil {
return nil, err
} else if res.StatusCode < 200 || res.StatusCode > 299 {
}
defer res.Body.Close()

if res.StatusCode < 200 || res.StatusCode > 299 {
return nil, fmt.Errorf("Received status code %v", res.StatusCode)
} else {
content, err := io.ReadAll(res.Body)
res.Body.Close()
return string(content), err
}

content, err := io.ReadAll(res.Body)
return string(content), err
}

// Post makes a POST request.
Expand All @@ -60,11 +62,13 @@ func (o Post) Run() (interface{}, error) {
res, err := o.Client.Post(o.URL, "application/json", o.Body)
if err != nil {
return nil, err
} else if res.StatusCode < 200 || res.StatusCode > 299 {
}
defer res.Body.Close()

if res.StatusCode < 200 || res.StatusCode > 299 {
return nil, fmt.Errorf("Received status code %v", res.StatusCode)
} else {
content, err := io.ReadAll(res.Body)
res.Body.Close()
return string(content), err
}

content, err := io.ReadAll(res.Body)
return string(content), err
}
18 changes: 18 additions & 0 deletions operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ func TestGetNotFound(t *testing.T) {
}
}

func TestGetInvalid(t *testing.T) {
client := &http.Client{}
_, err := Get{client, ""}.Run()

if err == nil {
t.Errorf("Expected an error")
}
}

func TestPostSuccess(t *testing.T) {
expected := "OK"
srv := httptest.NewServer(
Expand Down Expand Up @@ -83,3 +92,12 @@ func TestPostNotFound(t *testing.T) {
t.Errorf("Expected an error")
}
}

func TestPostInvalid(t *testing.T) {
client := &http.Client{}
_, err := Post{client, "", bytes.NewBuffer([]byte(""))}.Run()

if err == nil {
t.Errorf("Expected an error")
}
}
50 changes: 18 additions & 32 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,34 @@ func (g *Goflow) stream(keepOpen bool) func(*gin.Context) {

history := make([]*execution, 0)

// open a channel for live executions
chanStream := make(chan *execution)

// periodically push the list of job runs into the stream
go func() {
defer close(chanStream)
for {
for jobname := range g.Jobs {
executions, _ := readExecutions(g.Store, jobname)
for _, e := range executions {
c.Stream(func(w io.Writer) bool {
for jobname := range g.Jobs {
executions, _ := readExecutions(g.Store, jobname)
for _, e := range executions {

// make sure it wasn't already sent
inHistory := false
// make sure it wasn't already sent
inHistory := false

for _, h := range history {
if e.ID == h.ID && e.ModifiedTimestamp == h.ModifiedTimestamp {
inHistory = true
}
for _, h := range history {
if e.ID == h.ID && e.ModifiedTimestamp == h.ModifiedTimestamp {
inHistory = true
}
}

if !inHistory {
if job != "" && job == e.JobName {
chanStream <- e
history = append(history, e)
} else if job == "" {
chanStream <- e
history = append(history, e)
}
if !inHistory {
if (job != "" && job == e.JobName) || job == "" {
c.SSEvent("message", e)
history = append(history, e)
}

}

}
time.Sleep(time.Second * 1)
}
}()

c.Stream(func(w io.Writer) bool {
if msg, ok := <-chanStream; ok {
c.SSEvent("message", msg)
return keepOpen
}
return false
time.Sleep(time.Second * 1)

return keepOpen
})
}

Expand Down
2 changes: 1 addition & 1 deletion ui/src/plain.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function updateTaskStateCircles(execution) {
for (i in execution.tasks) {
const t = execution.tasks[i];
const s = stateColor(t.state);
updateStateCircles("task-table", `${execution.id}-${t}`, t.name, s, execution.submitted);
updateStateCircles("task-table", `${execution.id}-${t.name}`, t.name, s, execution.submitted);
}
}

Expand Down

0 comments on commit cded453

Please sign in to comment.