Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions framework/components/dockercompose/.changeset/v0.1.15.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Even more robust check if error is retryable for `Red Panda`'s schema registration
74 changes: 51 additions & 23 deletions framework/components/dockercompose/chip_ingress_set/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ func checkSchemaExists(registryURL, subject string) (int, bool) {
maxAttempts := uint(10)
var resp *http.Response
existErr := retry.Do(func() error {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

var err error
resp, err = http.Get(url)
if err != nil {
Expand Down Expand Up @@ -630,6 +634,11 @@ func registerSingleProto(

var resp *http.Response
registerErr := retry.Do(func() error {
// Close previous response before next attempt
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

var respErr error
resp, respErr = client.Do(&http.Request{
Method: "POST",
Expand All @@ -644,11 +653,17 @@ func registerSingleProto(
}

if resp.StatusCode >= 300 {
data, dataErr := io.ReadAll(resp.Body)
if dataErr != nil {
return errors.Wrap(dataErr, "failed to read response body")
body, bodyErr := io.ReadAll(resp.Body)
resp.Body.Close()
if bodyErr != nil {
return errors.Wrap(bodyErr, "failed to read response body")
}

if resp.StatusCode >= 400 && resp.StatusCode < 500 {
return retry.Unrecoverable(fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, string(body)))
}
return fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, data)

return fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, string(body))
}

return nil
Expand All @@ -657,7 +672,13 @@ func registerSingleProto(
}), retry.RetryIf(func(err error) bool {
// we don't want to retry all errors, because some of them are are expected (e.g. missing dependencies)
// and will be handled by higher-level code
return isRetryableError(err)
shouldRetry := isRetryableError(err)

if !shouldRetry {
framework.L.Warn().Msgf("Determined to not retry error: %T (error: %s)", err, err.Error())
}

return shouldRetry
}))
if registerErr != nil {
return 0, errors.Wrapf(registerErr, "failed to register schema for subject %s", subject)
Expand Down Expand Up @@ -699,16 +720,6 @@ func stripFolderPrefix(path string, prefixes []string) string {
return path
}

// transformSchemaContent removes folder prefixes from import statements in protobuf source
func transformSchemaContent(content string, prefixes []string) string {
modified := content
for _, prefix := range prefixes {
// Transform import statements like "workflows/v1/" to "v1/"
modified = strings.ReplaceAll(modified, `"`+prefix, `"`)
}
return modified
}

// buildDependencyGraph builds a dependency graph from protobuf files
func buildDependencyGraph(protoMap map[string]string) (map[string][]string, error) {
dependencies := make(map[string][]string)
Expand Down Expand Up @@ -804,15 +815,32 @@ func isRetryableError(err error) bool {
return false
}

retryableErrorMessages := []string{
"connection reset by peer",
"EOF",
}

for _, msg := range retryableErrorMessages {
if strings.Contains(err.Error(), msg) {
var urlErr *url.Error
if errors.As(err, &urlErr) {
// Retry on timeouts
var ne net.Error
if errors.As(urlErr, &ne) && ne.Timeout() {
return true
}
// Fall through to check the cause too
err = urlErr.Err
}
return false

// network-layer errors worth retrying (dial/read/write problems)
var opErr *net.OpError
if errors.As(err, &opErr) {
return true
}

if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}

msg := err.Error()
return strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "broken pipe") ||
strings.Contains(msg, "connection refused") ||
strings.Contains(msg, "http2: stream error") ||
strings.Contains(msg, "EOF") ||
strings.Contains(strings.ToLower(msg), "timeout")
}
Loading