diff --git a/framework/components/dockercompose/.changeset/v0.1.15.md b/framework/components/dockercompose/.changeset/v0.1.15.md new file mode 100644 index 000000000..a04d1628e --- /dev/null +++ b/framework/components/dockercompose/.changeset/v0.1.15.md @@ -0,0 +1 @@ +- Even more robust check if error is retryable for `Red Panda`'s schema registration \ No newline at end of file diff --git a/framework/components/dockercompose/chip_ingress_set/protos.go b/framework/components/dockercompose/chip_ingress_set/protos.go index eaa8c657d..f0083fec3 100644 --- a/framework/components/dockercompose/chip_ingress_set/protos.go +++ b/framework/components/dockercompose/chip_ingress_set/protos.go @@ -557,6 +557,10 @@ func checkSchemaExists(registryURL, subject string) (int, bool) { maxAttempts := uint(10) var resp *http.Response existErr := retry.Do(func() error { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + var err error resp, err = http.Get(url) if err != nil { @@ -630,6 +634,11 @@ func registerSingleProto( var resp *http.Response registerErr := retry.Do(func() error { + // Close previous response before next attempt + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + var respErr error resp, respErr = client.Do(&http.Request{ Method: "POST", @@ -644,11 +653,17 @@ func registerSingleProto( } if resp.StatusCode >= 300 { - data, dataErr := io.ReadAll(resp.Body) - if dataErr != nil { - return errors.Wrap(dataErr, "failed to read response body") + body, bodyErr := io.ReadAll(resp.Body) + resp.Body.Close() + if bodyErr != nil { + return errors.Wrap(bodyErr, "failed to read response body") + } + + if resp.StatusCode >= 400 && resp.StatusCode < 500 { + return retry.Unrecoverable(fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, string(body))) } - return fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, data) + + return fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, string(body)) } return nil @@ -657,7 +672,13 @@ func registerSingleProto( }), retry.RetryIf(func(err error) bool { // we don't want to retry all errors, because some of them are are expected (e.g. missing dependencies) // and will be handled by higher-level code - return isRetryableError(err) + shouldRetry := isRetryableError(err) + + if !shouldRetry { + framework.L.Warn().Msgf("Determined to not retry error: %T (error: %s)", err, err.Error()) + } + + return shouldRetry })) if registerErr != nil { return 0, errors.Wrapf(registerErr, "failed to register schema for subject %s", subject) @@ -699,16 +720,6 @@ func stripFolderPrefix(path string, prefixes []string) string { return path } -// transformSchemaContent removes folder prefixes from import statements in protobuf source -func transformSchemaContent(content string, prefixes []string) string { - modified := content - for _, prefix := range prefixes { - // Transform import statements like "workflows/v1/" to "v1/" - modified = strings.ReplaceAll(modified, `"`+prefix, `"`) - } - return modified -} - // buildDependencyGraph builds a dependency graph from protobuf files func buildDependencyGraph(protoMap map[string]string) (map[string][]string, error) { dependencies := make(map[string][]string) @@ -804,15 +815,32 @@ func isRetryableError(err error) bool { return false } - retryableErrorMessages := []string{ - "connection reset by peer", - "EOF", - } - - for _, msg := range retryableErrorMessages { - if strings.Contains(err.Error(), msg) { + var urlErr *url.Error + if errors.As(err, &urlErr) { + // Retry on timeouts + var ne net.Error + if errors.As(urlErr, &ne) && ne.Timeout() { return true } + // Fall through to check the cause too + err = urlErr.Err } - return false + + // network-layer errors worth retrying (dial/read/write problems) + var opErr *net.OpError + if errors.As(err, &opErr) { + return true + } + + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + + msg := err.Error() + return strings.Contains(msg, "connection reset by peer") || + strings.Contains(msg, "broken pipe") || + strings.Contains(msg, "connection refused") || + strings.Contains(msg, "http2: stream error") || + strings.Contains(msg, "EOF") || + strings.Contains(strings.ToLower(msg), "timeout") }