Skip to content

Commit

Permalink
support for wrapped errors in the rest of packages
Browse files Browse the repository at this point in the history
  • Loading branch information
3vilhamster committed May 22, 2024
1 parent e1beb94 commit bd63d06
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 40 deletions.
6 changes: 4 additions & 2 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"context"
"errors"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -755,7 +756,7 @@ func (c *clientImpl) GetReplicationMessages(
tag.ShardReplicationToken(req),
)
// Returns service busy error to notify replication
if _, ok := err.(*types.ServiceBusyError); ok {
if errors.As(err, new(*types.ServiceBusyError)) {
return err
}
return nil
Expand Down Expand Up @@ -1096,7 +1097,8 @@ redirectLoop:
}
err = op(ctx, peer)
if err != nil {
if s, ok := err.(*types.ShardOwnershipLostError); ok {
var s *types.ShardOwnershipLostError
if errors.As(err, &s) {
// TODO: consider emitting a metric for number of redirects
peer, err = c.peerResolver.FromHostAddress(s.GetOwner())
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion common/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package dynamicconfig

import (
"errors"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (c *Collection) logError(
if errCount%errCountLogThreshold == 0 {
// log only every 'x' errors to reduce mem allocs and to avoid log noise
filteredKey := getFilteredKeyAsString(key, filters)
if _, ok := err.(*types.EntityNotExistsError); ok {
if errors.As(err, new(*types.EntityNotExistsError)) {
c.logger.Debug("dynamic config not set, use default value", tag.Key(filteredKey))
} else {
c.logger.Warn("Failed to fetch key from dynamic config", tag.Key(filteredKey), tag.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (csc *configStoreClient) updateValue(name dc.Key, dcValues []*types.Dynamic
return errors.New("timeout error on update")
default:
if err != nil {
if _, ok := err.(*persistence.ConditionFailedError); ok && retryAttempts > 0 {
if errors.As(err, new(*persistence.ConditionFailedError)) && retryAttempts > 0 {
// fetch new config and retry
err := csc.update()
if err != nil {
Expand Down
12 changes: 5 additions & 7 deletions common/ndc/history_resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,18 @@ func (n *HistoryResenderImpl) SendSingleWorkflowHistory(
historyBatch.versionHistory.GetItems())

err = n.sendReplicationRawRequest(ctx, replicationRequest)
switch err.(type) {
case nil:
// continue to process the events
break
case *types.EntityNotExistsError:
if err == nil {
continue
}
if errors.As(err, new(*types.EntityNotExistsError)) {
// Case 1: the workflow pass the retention period
// Case 2: the workflow is corrupted
if skipTask := n.fixCurrentExecution(ctx, domainID, workflowID, runID); skipTask {
return ErrSkipTask
}
return err
default:
return fmt.Errorf("sending replication request: %w", err)
}
return fmt.Errorf("sending replication request: %w", err)
}
return nil
}
Expand Down
20 changes: 8 additions & 12 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,19 @@ func IsServiceTransientError(err error) bool {

// IsEntityNotExistsError checks if the error is an entity not exists error.
func IsEntityNotExistsError(err error) bool {
_, ok := err.(*types.EntityNotExistsError)
return ok
return errors.As(err, new(*types.EntityNotExistsError))
}

// IsServiceBusyError checks if the error is a service busy error.
func IsServiceBusyError(err error) bool {
switch err.(type) {
case *types.ServiceBusyError:
return true
}
return false
return errors.As(err, new(*types.ServiceBusyError))
}

// IsContextTimeoutError checks if the error is context timeout error
func IsContextTimeoutError(err error) bool {
switch err := err.(type) {
case *types.InternalServiceError:
return err.Message == context.DeadlineExceeded.Error()
var internalErr *types.InternalServiceError
if errors.As(err, &internalErr) {
return internalErr.Message == context.DeadlineExceeded.Error()
}
return err == context.DeadlineExceeded || yarpcerrors.IsDeadlineExceeded(err)
}
Expand Down Expand Up @@ -933,7 +928,8 @@ func ConvertDynamicConfigMapPropertyToIntMap(dcValue map[string]interface{}) (ma

// IsStickyTaskConditionError is error from matching engine
func IsStickyTaskConditionError(err error) bool {
if e, ok := err.(*types.InternalServiceError); ok {
var e *types.InternalServiceError
if errors.As(err, &e) {
return e.GetMessage() == StickyTaskConditionFailedErrorMsg
}
return false
Expand Down Expand Up @@ -977,7 +973,7 @@ func ConvertErrToGetTaskFailedCause(err error) types.GetTaskFailedCause {
if IsServiceBusyError(err) {
return types.GetTaskFailedCauseServiceBusy
}
if _, ok := err.(*types.ShardOwnershipLostError); ok {
if errors.As(err, new(*types.ShardOwnershipLostError)) {
return types.GetTaskFailedCauseShardOwnershipLost
}
return types.GetTaskFailedCauseUncategorized
Expand Down
3 changes: 2 additions & 1 deletion host/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -208,7 +209,7 @@ func (s *IntegrationSuite) isMutableStateDeleted(domainID string, execution *typ
ctx, cancel := context.WithTimeout(context.Background(), defaultTestPersistenceTimeout)
_, err := s.testCluster.testBase.ExecutionManager.GetWorkflowExecution(ctx, request)
cancel()
if _, ok := err.(*types.EntityNotExistsError); ok {
if errors.As(err, new(*types.EntityNotExistsError)) {
return true
}
time.Sleep(retryBackoffTime)
Expand Down
3 changes: 2 additions & 1 deletion host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package host
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -812,7 +813,7 @@ func (c *cadenceImpl) createSystemDomain() error {
FailoverVersion: common.EmptyVersion,
})
if err != nil {
if _, ok := err.(*types.DomainAlreadyExistsError); ok {
if errors.As(err, new(*types.DomainAlreadyExistsError)) {
return nil
}
return fmt.Errorf("failed to create cadence-system domain: %v", err)
Expand Down
13 changes: 7 additions & 6 deletions host/query_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func (s *IntegrationSuite) TestQueryWorkflow_Sticky() {
}
queryResult = <-queryResultCh
s.NotNil(queryResult.Err)
queryFailError, ok := queryResult.Err.(*types.QueryFailedError)
s.True(ok)
var queryFailError *types.QueryFailedError
s.True(errors.As(err, &queryFailError))
s.Equal("unknown-query-type", queryFailError.Message)
}

Expand Down Expand Up @@ -523,8 +523,8 @@ func (s *IntegrationSuite) TestQueryWorkflow_NonSticky() {
}
queryResult = <-queryResultCh
s.NotNil(queryResult.Err)
queryFailError, ok := queryResult.Err.(*types.QueryFailedError)
s.True(ok)
var queryFailError *types.QueryFailedError
s.True(errors.As(err, &queryFailError))
s.Equal("unknown-query-type", queryFailError.Message)

// advance the state of the decider
Expand Down Expand Up @@ -1416,6 +1416,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_BeforeFirstDecision() {
},
})
s.Nil(queryResp)
s.IsType(&types.QueryFailedError{}, err)
s.Equal("workflow must handle at least one decision task before it can be queried", err.(*types.QueryFailedError).Message)
var queryErr *types.QueryFailedError
s.True(errors.As(err, &queryErr), "wrong error: %v", err)
s.Equal("workflow must handle at least one decision task before it can be queried", queryErr.Message)
}
8 changes: 6 additions & 2 deletions host/signal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -1574,7 +1575,9 @@ func (s *IntegrationSuite) TestSignalWithStartWorkflow_IDReusePolicy() {
cancel()
s.Nil(resp)
s.Error(err)
errMsg := err.(*types.WorkflowExecutionAlreadyStartedError).GetMessage()
var alreadyStartedErr *types.WorkflowExecutionAlreadyStartedError
s.True(errors.As(err, &alreadyStartedErr))
errMsg := alreadyStartedErr.GetMessage()
s.True(strings.Contains(errMsg, "reject duplicate workflow ID"))

// test policy WorkflowIDReusePolicyAllowDuplicateFailedOnly
Expand All @@ -1585,7 +1588,8 @@ func (s *IntegrationSuite) TestSignalWithStartWorkflow_IDReusePolicy() {
cancel()
s.Nil(resp)
s.Error(err)
errMsg = err.(*types.WorkflowExecutionAlreadyStartedError).GetMessage()
s.True(errors.As(err, &alreadyStartedErr))
errMsg = alreadyStartedErr.GetMessage()
s.True(strings.Contains(errMsg, "allow duplicate workflow ID if last run failed"))

// test policy WorkflowIDReusePolicyAllowDuplicate
Expand Down
5 changes: 3 additions & 2 deletions service/frontend/wrappers/clusterredirection/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package clusterredirection

import (
"context"
"errors"
"fmt"

"github.com/uber/cadence/common/cache"
Expand Down Expand Up @@ -235,8 +236,8 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) withRedirect(ctx con
}

func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) isDomainNotActiveError(err error) (string, bool) {
domainNotActiveErr, ok := err.(*types.DomainNotActiveError)
if !ok {
var domainNotActiveErr *types.DomainNotActiveError
if !errors.As(err, &domainNotActiveErr) {
return "", false
}
return domainNotActiveErr.ActiveCluster, true
Expand Down
11 changes: 6 additions & 5 deletions service/worker/parentclosepolicy/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package parentclosepolicy

import (
"context"
"errors"
"fmt"
"math/rand"
"time"
Expand Down Expand Up @@ -197,12 +198,12 @@ func ProcessorActivity(ctx context.Context, request Request) error {
err = fmt.Errorf("unknown parent close policy: %v", execution.Policy)
}
if err != nil {
switch err.(type) {
case *types.EntityNotExistsError,
*types.WorkflowExecutionAlreadyCompletedError,
*types.CancellationAlreadyRequestedError:
switch {
case errors.As(err, new(*types.EntityNotExistsError)),
errors.As(err, new(*types.WorkflowExecutionAlreadyCompletedError)),
errors.As(err, new(*types.CancellationAlreadyRequestedError)):
err = nil
case *types.DomainNotActiveError:
case errors.As(err, new(*types.DomainNotActiveError)):
var domainEntry *cache.DomainCacheEntry
if domainEntry, err = domainCache.GetDomainByID(domainID); err == nil {
cluster := domainEntry.GetReplicationConfig().ActiveClusterName
Expand Down

0 comments on commit bd63d06

Please sign in to comment.