Skip to content

Commit

Permalink
fix: correct http POST to use keep-alives
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 22, 2021
1 parent 22b0b8b commit 59669ca
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 22 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
# Changelog

## v0.0.21 (2021-05-21)

* [22b0b8b](https://github.com/argoproj/argo-workflows/commit/22b0b8b436ea3bef7e0f2aa17fe223e31e51eb30) fix: fix bouncy scaling bug
* [2d661ab](https://github.com/argoproj/argo-workflows/commit/2d661abcca7a1b8e734020690900dc3d2207aff8) test: tidy up

### Contributors

* Alex Collins

## v0.0.20 (2021-05-21)

* [35e7913](https://github.com/argoproj/argo-workflows/commit/35e7913c44a926ac5069b2f2674c2852a750f1e5) chore: make lint
* [de8a19f](https://github.com/argoproj/argo-workflows/commit/de8a19fcb6960af8096ddb3bb944a06c883482b4) fix: enhanced shutdown
* [6eebb68](https://github.com/argoproj/argo-workflows/commit/6eebb688937107704b902273cbafe4e98d35ff33) docs: add 301-kafka-pipeline.yaml
* [f3c6f16](https://github.com/argoproj/argo-workflows/commit/f3c6f16d0af4398c81ab981d397f7096f9c8c51a) docs: update CHANGELOG.md
* [30ce723](https://github.com/argoproj/argo-workflows/commit/30ce7237c3bcdf4e74d6de12aab99f6483588672) chore: code comment
* [94fb293](https://github.com/argoproj/argo-workflows/commit/94fb2937d6fc87b57ef2723da6f95ab95606fc2e) chore: logging
* [16362e4](https://github.com/argoproj/argo-workflows/commit/16362e49207b62e42efe8f25ef131fbe8d2ade0b) test: added kafka example
* [786ae3d](https://github.com/argoproj/argo-workflows/commit/786ae3d67dad1c6cc79ac04e2d1d9c27b361d2e4) build: reduced update interval
* [ad34a96](https://github.com/argoproj/argo-workflows/commit/ad34a966b8fecf99244e01e21e69f854a06a1410) ci: print logs
* [34f5c17](https://github.com/argoproj/argo-workflows/commit/34f5c171798c1643b4771f896f017650550a6bbb) chore: simplify controller

### Contributors

* Alex Collins

## v0.0.19 (2021-05-20)

* [122acc3](https://github.com/argoproj/argo-workflows/commit/122acc33ed8e0d7dbb2a15b2a83a028a2b31d430) docs: added scaling examples
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ $(GOBIN)/goreman:

# Run against the configured Kubernetes cluster in ~/.kube/config
start: generate deploy $(GOBIN)/goreman
kubectl config set-context --current --namespace=argo-dataflow-system
goreman -set-ports=false -logtime=false start
wait:
kubectl -n argo-dataflow-system get pod
Expand Down
18 changes: 11 additions & 7 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,14 @@ func connectTo(ctx context.Context) (func([]byte) error, error) {
})
return func(data []byte) error {
trace.Info("◷ source → http")
resp, err := http.Post("http://localhost:8080/messages", "application/octet-stream", bytes.NewBuffer(data))
if err != nil {
if resp, err := http.Post("http://localhost:8080/messages", "application/octet-stream", bytes.NewBuffer(data)); err != nil {
return fmt.Errorf("failed to send to main: %w", err)
}
if resp.StatusCode >= 300 {
} else {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("failed to send to main: %q %q", resp.Status, body)
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 300 {
return fmt.Errorf("failed to send to main: %q %q", resp.Status, body)
}
}
trace.Info("✔ source → http")
return nil
Expand Down Expand Up @@ -594,9 +595,12 @@ func connectSink() (func([]byte) error, error) {
sinks[sinkName] = func(msg []byte) error {
if resp, err := http.Post(x.URL, "application/octet-stream", bytes.NewBuffer(msg)); err != nil {
return err
} else if resp.StatusCode >= 300 {
} else {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("failed to send HTTP request: %q %q", resp.Status, body)
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 300 {
return fmt.Errorf("failed to send HTTP request: %q %q", resp.Status, body)
}
}
return nil
}
Expand Down
20 changes: 12 additions & 8 deletions runner/util/do.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ func Do(ctx context.Context, fn func(msg []byte) ([][]byte, error)) error {
return
}
for _, out := range msgs {
resp, err := http.Post("http://localhost:3569/messages", "application/octet-stream", bytes.NewBuffer(out))
err := func() error {
if resp, err := http.Post("http://localhost:3569/messages", "application/octet-stream", bytes.NewBuffer(out)); err != nil {
return err
} else {
body, _ := ioutil.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
return fmt.Errorf("failed to post message %s: %s", resp.Status, string(body))
}
}
return nil
}()
if err != nil {
logger.Error(err, "failed to post message")
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}
if resp.StatusCode != 200 {
err := fmt.Errorf("failed to post message %s", resp.Status)
logger.Error(err, "failed to post message")
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}
logger.V(6).Info("do", "in", string(in), "out", string(out))
}
w.WriteHeader(200)
Expand Down
22 changes: 15 additions & 7 deletions runtimes/go1-16/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
)
Expand All @@ -24,15 +25,22 @@ func main() {
return
}
for _, msg := range msgs {
resp, err := http.Post("http://localhost:3569/messages", "application/octet-stream", bytes.NewBuffer(msg))
if err != nil {
err := func() error {
if resp, err := http.Post("http://localhost:3569/messages", "application/octet-stream", bytes.NewBuffer(msg)); err != nil {
return err
}else {
body, _ := ioutil.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
return fmt.Errorf("failed to post message: %q %q", resp.Status, string(body))
}
}
return nil
}()
if err!=nil {
println(err, "failed to post message")
w.WriteHeader(500)
return
}
if resp.StatusCode != 200 {
println(err, "failed to post message", resp.Status)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}
}
Expand Down

0 comments on commit 59669ca

Please sign in to comment.