Skip to content

Commit

Permalink
fix: surface errors
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 19, 2021
1 parent c000e41 commit 6204cd1
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func patchStepStatus(ctx context.Context) {
"sinkStatuses": sinkStatues,
},
})
logger.Info("patching step status (sinks/sources)", "patch", patch)
debug.Info("patching step status (sinks/sources)", "patch", patch)
if _, err := dynamicInterface.
Resource(dfv1.StepGroupVersionResource).
Namespace(namespace).
Expand Down Expand Up @@ -348,6 +348,7 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
if err != nil {
logger.Error(err, "⚠ http →")
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
withLock(func() { sourceStatues.IncErrors(sourceName, replica, err) })
return
}
Expand All @@ -356,6 +357,7 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
if err := toMain(data); err != nil {
logger.Error(err, "⚠ http →")
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
withLock(func() { sourceStatues.IncErrors(sourceName, replica, err) })
} else {
debug.Info("✔ http ")
Expand Down Expand Up @@ -435,7 +437,8 @@ func connectTo(ctx context.Context) (func([]byte) error, error) {
return fmt.Errorf("failed to send message to main via HTTP: %w", err)
}
if resp.StatusCode >= 300 {
return fmt.Errorf("failed to sent message to main via HTTP: %s", resp.Status)
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("failed to sent message to main via HTTP: %q %q", resp.Status, body)
}
trace.Info("✔ source → http")
return nil
Expand Down Expand Up @@ -480,16 +483,18 @@ func connectOut(toSink func([]byte) error) {
if err != nil {
logger.Error(err, "failed to read message body from main via HTTP")
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}
trace.Info("◷ http → sink")
if err := toSink(data); err != nil {
logger.Error(err, "failed to send message from main to sink")
w.WriteHeader(500)
return
_, _ = w.Write([]byte(err.Error()))
} else {
trace.Info("✔ http → sink")
w.WriteHeader(200)
}
trace.Info("✔ http → sink")
w.WriteHeader(200)
})
go func() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
Expand Down Expand Up @@ -561,8 +566,9 @@ func connectSink() (func([]byte) error, error) {
err := func() error {
if resp, err := http.Post(x.URL, "application/octet-stream", bytes.NewBuffer(m)); err != nil {
return err
} else if resp.StatusCode != 200 {
return fmt.Errorf("failed to send HTTP request: %q", resp.Status)
} else if resp.StatusCode >= 300 {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("failed to send HTTP request: %q %q", resp.Status, body)
} else {
return nil
}
Expand Down

0 comments on commit 6204cd1

Please sign in to comment.