Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: Context support #330

Merged
merged 5 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cluster/cluster_impl/available_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package cluster_impl

import (
"context"
"fmt"
)

Expand All @@ -40,7 +41,7 @@ func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}

func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
Expand All @@ -54,7 +55,7 @@ func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) p

for _, ivk := range invokers {
if ivk.IsAvailable() {
return ivk.Invoke(invocation)
return ivk.Invoke(ctx, invocation)
}
}
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))}
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster_impl/available_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.Equal(t, mockResult, result)
}
Expand All @@ -80,7 +80,7 @@ func TestAvailableClusterInvokerNoAvail(t *testing.T) {

invoker.EXPECT().IsAvailable().Return(false)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})

assert.NotNil(t, result.Error())
assert.True(t, strings.Contains(result.Error().Error(), "no provider available"))
Expand Down
7 changes: 5 additions & 2 deletions cluster/cluster_impl/broadcast_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.

package cluster_impl

import (
"context"
)
import (
"github.com/apache/dubbo-go/cluster"
flycash marked this conversation as resolved.
Show resolved Hide resolved
"github.com/apache/dubbo-go/common/logger"
Expand All @@ -33,7 +36,7 @@ func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}

func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *broadcastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
Expand All @@ -46,7 +49,7 @@ func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) p

var result protocol.Result
for _, ivk := range invokers {
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk)
err = result.Error()
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster_impl/broadcast_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Test_BroadcastInvokeSuccess(t *testing.T) {

clusterInvoker := registerBroadcast(t, invokers...)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}

Expand Down Expand Up @@ -104,6 +104,6 @@ func Test_BroadcastInvokeFailed(t *testing.T) {

clusterInvoker := registerBroadcast(t, invokers...)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error())
}
11 changes: 6 additions & 5 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cluster_impl

import (
"context"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -71,7 +72,7 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
return invoker
}

func (invoker *failbackClusterInvoker) process() {
func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
Expand Down Expand Up @@ -102,7 +103,7 @@ func (invoker *failbackClusterInvoker) process() {

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
Expand All @@ -126,7 +127,7 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err
}
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
Expand All @@ -150,11 +151,11 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr

ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
go invoker.process()
go invoker.process(ctx)
})

taskLen := invoker.taskList.Len()
Expand Down
12 changes: 6 additions & 6 deletions cluster/cluster_impl/failback_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Test_FailbackSuceess(t *testing.T) {
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}

Expand Down Expand Up @@ -102,7 +102,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
return mockSuccResult
})

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
Expand Down Expand Up @@ -150,7 +150,7 @@ func Test_FailbackRetryFailed(t *testing.T) {
}

// first call should failed.
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
Expand Down Expand Up @@ -192,7 +192,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
}).Times(10)

for i := 0; i < 10; i++ {
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
Expand Down Expand Up @@ -222,14 +222,14 @@ func Test_FailbackOutOfLimit(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11)

// reached limit
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))

// all will be out of limit
for i := 0; i < 10; i++ {
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
Expand Down
7 changes: 5 additions & 2 deletions cluster/cluster_impl/failfast_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.

package cluster_impl

import (
"context"
flycash marked this conversation as resolved.
Show resolved Hide resolved
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/protocol"
Expand All @@ -32,7 +35,7 @@ func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}

func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
Expand All @@ -47,5 +50,5 @@ func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) pr
}

ivk := invoker.doSelect(loadbalance, invocation, invokers, nil)
return ivk.Invoke(invocation)
return ivk.Invoke(ctx, invocation)
}
4 changes: 2 additions & 2 deletions cluster/cluster_impl/failfast_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Test_FailfastInvokeSuccess(t *testing.T) {
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NoError(t, result.Error())
res := result.Result().(rest)
Expand All @@ -89,7 +89,7 @@ func Test_FailfastInvokeFail(t *testing.T) {
mockResult := &protocol.RPCResult{Err: perrors.New("error")}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NotNil(t, result.Error())
assert.Equal(t, "error", result.Error().Error())
Expand Down
5 changes: 3 additions & 2 deletions cluster/cluster_impl/failover_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cluster_impl

import (
"context"
"strconv"
)

Expand All @@ -43,7 +44,7 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}

func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {

invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
Expand Down Expand Up @@ -95,7 +96,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
}
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
Expand Down
10 changes: 5 additions & 5 deletions cluster/cluster_impl/failover_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type rest struct {
success bool
}

func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result {
count++
var success bool
var err error = nil
Expand Down Expand Up @@ -112,9 +112,9 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failoverCluster.Join(staticDir)
if len(invocations) > 0 {
return clusterInvoker.Invoke(invocations[0])
return clusterInvoker.Invoke(context.Background(), invocations[0])
}
return clusterInvoker.Invoke(&invocation.RPCInvocation{})
return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
}
func Test_FailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
Expand Down Expand Up @@ -155,14 +155,14 @@ func Test_FailoverDestroy(t *testing.T) {

invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(context.Background(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}

staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failoverCluster.Join(staticDir)
assert.Equal(t, true, clusterInvoker.IsAvailable())
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
clusterInvoker.Destroy()
Expand Down
7 changes: 5 additions & 2 deletions cluster/cluster_impl/failsafe_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package cluster_impl

import (
"context"
flycash marked this conversation as resolved.
Show resolved Hide resolved
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
Expand All @@ -42,7 +45,7 @@ func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}

func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)

err := invoker.checkInvokers(invokers, invocation)
Expand All @@ -65,7 +68,7 @@ func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) pr

ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
// ignore
logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error())
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster_impl/failsafe_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) {
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NoError(t, result.Error())
res := result.Result().(rest)
Expand All @@ -88,7 +88,7 @@ func Test_FailSafeInvokeFail(t *testing.T) {
mockResult := &protocol.RPCResult{Err: perrors.New("error")}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NoError(t, result.Error())
assert.Nil(t, result.Result())
Expand Down
5 changes: 3 additions & 2 deletions cluster/cluster_impl/forking_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package cluster_impl

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -44,7 +45,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}

func (invoker *forkingClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
err := invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
Expand Down Expand Up @@ -75,7 +76,7 @@ func (invoker *forkingClusterInvoker) Invoke(invocation protocol.Invocation) pro
resultQ := queue.New(1)
for _, ivk := range selected {
go func(k protocol.Invoker) {
result := k.Invoke(invocation)
result := k.Invoke(ctx, invocation)
err := resultQ.Put(result)
if err != nil {
logger.Errorf("resultQ put failed with exception: %v.\n", err)
Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster_impl/forking_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func Test_ForkingInvokeSuccess(t *testing.T) {

clusterInvoker := registerForking(t, invokers...)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
wg.Wait()
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func Test_ForkingInvokeTimeout(t *testing.T) {

clusterInvoker := registerForking(t, invokers...)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.NotNil(t, result)
assert.NotNil(t, result.Error())
wg.Wait()
Expand Down Expand Up @@ -156,7 +156,7 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) {

clusterInvoker := registerForking(t, invokers...)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
wg.Wait()
}
Loading