Skip to content

Commit

Permalink
completed the rest
Browse files Browse the repository at this point in the history
  • Loading branch information
william feng committed Jun 4, 2020
1 parent 55dca71 commit b0c34c2
Show file tree
Hide file tree
Showing 14 changed files with 34 additions and 33 deletions.
2 changes: 1 addition & 1 deletion filter/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// Authenticator defines how an Authenticator works.
// Custom Authenticator must be set by calling auth.SetAuthenticator before use.
// Custom Authenticator must be set by calling auth.SetAuthenticator before use.
type Authenticator interface {

// Sign adds signature to the invocation
Expand Down
8 changes: 4 additions & 4 deletions filter/filter_impl/active_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ func init() {
extension.SetFilter(active, GetActiveFilter)
}

// ActiveFilter ...
// ActiveFilter tracks the requests status
type ActiveFilter struct {
}

// Invoke ...
// Invoke starts to record the requests status
func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
protocol.BeginCount(invoker.GetUrl(), invocation.MethodName())
return invoker.Invoke(ctx, invocation)
}

// OnResponse ...
// OnResponse update the active count base on the request result.
func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64)
if err != nil {
Expand All @@ -64,7 +64,7 @@ func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result,
return result
}

// GetActiveFilter ...
// GetActiveFilter creates ActiveFilter instance
func GetActiveFilter() filter.Filter {
return &ActiveFilter{}
}
2 changes: 1 addition & 1 deletion filter/filter_impl/auth/consumer_sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invo
return invoker.Invoke(ctx, invocation)
}

// OnResponse dummy process, return the result directly
// OnResponse dummy process, returns the result directly
func (csf *ConsumerSignFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
Expand Down
2 changes: 1 addition & 1 deletion filter/filter_impl/auth/provider_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invo
return invoker.Invoke(ctx, invocation)
}

// OnResponse dummy process, return the result directly
// OnResponse dummy process, returns the result directly
func (paf *ProviderAuthFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
Expand Down
6 changes: 3 additions & 3 deletions filter/filter_impl/echo_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func init() {
extension.SetFilter(ECHO, GetFilter)
}

// EchoFilter
// EchoFilter health check
// RPCService need a Echo method in consumer, if you want to use EchoFilter
// eg:
// Echo func(ctx context.Context, arg interface{}, rsp *Xxx) error
type EchoFilter struct{}

// Invoke ...
// Invoke response to the callers with its first argument.
func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking echo filter.")
logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments()))
Expand All @@ -58,7 +58,7 @@ func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invo
return invoker.Invoke(ctx, invocation)
}

// OnResponse ...
// OnResponse dummy process, returns the result directly
func (ef *EchoFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
_ protocol.Invocation) protocol.Result {

Expand Down
9 changes: 4 additions & 5 deletions filter/filter_impl/execute_limit_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ func init() {
extension.SetFilter(name, GetExecuteLimitFilter)
}

// ExecuteLimitFilter will limit the number of in-progress request and it's thread-safe.
/**
* ExecuteLimitFilter
* The filter will limit the number of in-progress request and it's thread-safe.
* example:
* "UserProvider":
* registry: "hangzhouzk"
Expand Down Expand Up @@ -80,7 +79,7 @@ type ExecuteState struct {
concurrentCount int64
}

// Invoke ...
// Invoke judges whether the current processing requests over the threshold
func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
methodConfigPrefix := "methods." + invocation.MethodName() + "."
ivkURL := invoker.GetUrl()
Expand Down Expand Up @@ -122,7 +121,7 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok
return invoker.Invoke(ctx, invocation)
}

// OnResponse ...
// OnResponse dummy process, returns the result directly
func (ef *ExecuteLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
return result
}
Expand All @@ -138,7 +137,7 @@ func (state *ExecuteState) decrease() {
var executeLimitOnce sync.Once
var executeLimitFilter *ExecuteLimitFilter

// GetExecuteLimitFilter ...
// GetExecuteLimitFilter returns the singleton ExecuteLimitFilter instance
func GetExecuteLimitFilter() filter.Filter {
executeLimitOnce.Do(func() {
executeLimitFilter = &ExecuteLimitFilter{
Expand Down
6 changes: 3 additions & 3 deletions filter/filter_impl/generic_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func init() {
// GenericFilter ...
type GenericFilter struct{}

// Invoke ...
// Invoke turns the parameters to map for generic method
func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 {
oldArguments := invocation.Arguments()
Expand All @@ -73,13 +73,13 @@ func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i
return invoker.Invoke(ctx, invocation)
}

// OnResponse ...
// OnResponse dummy process, returns the result directly
func (ef *GenericFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
_ protocol.Invocation) protocol.Result {
return result
}

// GetGenericFilter ...
// GetGenericFilter returns GenericFilter instance
func GetGenericFilter() filter.Filter {
return &GenericFilter{}
}
Expand Down
2 changes: 2 additions & 0 deletions filter/filter_impl/graceful_shutdown_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type gracefulShutdownFilter struct {
shutdownConfig *config.ShutdownConfig
}

// Invoke add the requests count and block the new requests if application is closing
func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if gf.rejectNewRequest() {
logger.Info("The application is closing, new request will be rejected.")
Expand All @@ -62,6 +63,7 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I
return invoker.Invoke(ctx, invocation)
}

// OnResponse reduce the number of active processes then return the process result
func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
atomic.AddInt32(&gf.activeCount, -1)
// although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true.
Expand Down
16 changes: 8 additions & 8 deletions filter/filter_impl/hystrix_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ var (

//The filter in the server end of dubbo-go can't get the invoke result for now,
//this filter ONLY works in CLIENT end (consumer side) temporarily
//Only after the callService logic is integrated into the filter chain of server end can this filter be used,
//Only after the callService logic is integrated into the filter chain of server end then the filter can be used,
//which will be done soon
func init() {
extension.SetFilter(HYSTRIX_CONSUMER, GetHystrixFilterConsumer)
extension.SetFilter(HYSTRIX_PROVIDER, GetHystrixFilterProvider)
}

// HystrixFilterError ...
// HystrixFilterError implements error interface
type HystrixFilterError struct {
err error
failByHystrix bool
Expand All @@ -72,12 +72,12 @@ func (hfError *HystrixFilterError) Error() string {
return hfError.err.Error()
}

// FailByHystrix ...
// FailByHystrix returns whether the fails causing by Hystrix
func (hfError *HystrixFilterError) FailByHystrix() bool {
return hfError.failByHystrix
}

// NewHystrixFilterError ...
// NewHystrixFilterError return a HystrixFilterError instance
func NewHystrixFilterError(err error, failByHystrix bool) error {
return &HystrixFilterError{
err: err,
Expand All @@ -92,7 +92,7 @@ type HystrixFilter struct {
ifNewMap sync.Map
}

// Invoke ...
// Invoke is an implentation of filter, provides Hystrix pattern latency and fault tolerance
func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
cmdName := fmt.Sprintf("%s&method=%s", invoker.GetUrl().Key(), invocation.MethodName())

Expand Down Expand Up @@ -154,12 +154,12 @@ func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i
return result
}

// OnResponse ...
// OnResponse dummy process, returns the result directly
func (hf *HystrixFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}

// GetHystrixFilterConsumer ...
// GetHystrixFilterConsumer returns HystrixFilter instance for consumer
func GetHystrixFilterConsumer() filter.Filter {
//When first called, load the config in
consumerConfigOnce.Do(func() {
Expand All @@ -170,7 +170,7 @@ func GetHystrixFilterConsumer() filter.Filter {
return &HystrixFilter{COrP: true}
}

// GetHystrixFilterProvider ...
// GetHystrixFilterProvider returns HystrixFilter instance for provider
func GetHystrixFilterProvider() filter.Filter {
providerConfigOnce.Do(func() {
if err := initHystrixConfigProvider(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions filter/filter_impl/token_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func init() {
extension.SetFilter(TOKEN, GetTokenFilter)
}

// TokenFilter ...
// TokenFilter will verify if the token is valid
type TokenFilter struct{}

// Invoke ...
// Invoke verifies the incoming token with the service configured token
func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
invokerTkn := invoker.GetUrl().GetParam(constant.TOKEN_KEY, "")
if len(invokerTkn) > 0 {
Expand All @@ -61,7 +61,7 @@ func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, inv
return invoker.Invoke(ctx, invocation)
}

// OnResponse ...
// OnResponse dummy process, returns the result directly
func (tf *TokenFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type SlidingWindowTpsLimitStrategyImpl struct {
queue *list.List
}

// IsAllowable determins whether the number of requests within the time window overs the threshold
// IsAllowable determins whether the number of requests within the time window overs the threshold
// It is thread-safe.
func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool {
impl.mutex.Lock()
Expand Down
2 changes: 1 addition & 1 deletion filter/filter_impl/tps_limit_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in
return invoker.Invoke(ctx, invocation)
}

// OnResponse dummy process, return the result directly
// OnResponse dummy process, returns the result directly
func (t TpsLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
_ protocol.Invocation) protocol.Result {
return result
Expand Down
2 changes: 1 addition & 1 deletion filter/handler/rejected_execution_handler_only_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func init() {
var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler
var onlyLogHandlerOnce sync.Once

// OnlyLogRejectedExecutionHandler implements the RejectedExecutionHandler
/**
* OnlyLogRejectedExecutionHandler
* This implementation only logs the invocation info.
* it always return en error inside the result.
* "UserProvider":
Expand Down
2 changes: 1 addition & 1 deletion filter/tps_limit_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package filter

// TpsLimitStrategy defines how to do the TPS limiting in method level.
// TpsLimitStrategy defines how to do the TPS limiting in method level.
/*
* please register your implementation by invoking SetTpsLimitStrategy
* "UserProvider":
Expand Down

0 comments on commit b0c34c2

Please sign in to comment.