Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sudo: required
go:
- 1.12.x
- 1.13.x

env:
- GO111MODULE=on
script:
- go test -v -race ./...
56 changes: 56 additions & 0 deletions adapter/dubbo/consumer_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package dubbo

import (
"context"
"github.com/apache/dubbo-go/protocol"
)
import (
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
)

type consumerFilter struct{}

func (d *consumerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
methodResourceName := getResourceName(invoker, invocation, getConsumerPrefix())
interfaceResourceName := ""
if getInterfaceGroupAndVersionEnabled() {
interfaceResourceName = getColonSeparatedKey(invoker.GetUrl())
} else {
interfaceResourceName = invoker.GetUrl().Service()
}
var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)

if !isAsync(invocation) {
interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound))
if b != nil { // blocked
return consumerDubboFallback(ctx, invoker, invocation, b)
}
methodEntry, b = sentinel.Entry(methodResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound), sentinel.WithArgs(invocation.Attachments()))
if b != nil { // blocked
return consumerDubboFallback(ctx, invoker, invocation, b)
}
} else {
// TODO : Need to implement asynchronous current limiting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What're the differences between async and sync mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mainly to see that in Java code, async entry is used instead of entry to restrict current. At this stage, Dubbo go has not added async field in attachment, so it will not come to async logic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Sentinel Go has unified entry and asyncEntry, so we could use sentinel.Entry(args) for both sync and async scenarios.

// unlimited flow for the time being
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
return invoker.Invoke(ctx, invocation)
}

func (d *consumerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
// TODO traceEntry()
methodEntry.(*base.SentinelEntry).Exit()
}
if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil {
// TODO traceEntry()
interfaceEntry.(*base.SentinelEntry).Exit()
}
return result
}
28 changes: 28 additions & 0 deletions adapter/dubbo/consumer_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package dubbo

import (
"context"
"testing"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/stretchr/testify/assert"
)

func TestConsumerFilter_Invoke(t *testing.T) {
f := GetConsumerFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]string, 0))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
// todo more testing code
}
31 changes: 31 additions & 0 deletions adapter/dubbo/fallback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package dubbo

import (
"context"
"github.com/apache/dubbo-go/protocol"
)
import (
"github.com/alibaba/sentinel-golang/core/base"
)

var (
consumerDubboFallback = getDefaultDubboFallback()
providerDubboFallback = getDefaultDubboFallback()
)

type DubboFallback func(context.Context, protocol.Invoker, protocol.Invocation, *base.BlockError) protocol.Result

func SetConsumerDubboFallback(f DubboFallback) {
consumerDubboFallback = f
}
func SetProviderDubboFallback(f DubboFallback) {
providerDubboFallback = f
}
func getDefaultDubboFallback() DubboFallback {
return func(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, blockError *base.BlockError) protocol.Result {
result := &protocol.RPCResult{}
result.SetResult(nil)
result.SetError(blockError)
return result
}
}
19 changes: 19 additions & 0 deletions adapter/dubbo/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package dubbo

import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
)

func init() {
extension.SetFilter(ProviderFilterName, GetProviderFilter)
extension.SetFilter(ConsumerFilterName, GetConsumerFilter)
}

func GetConsumerFilter() filter.Filter {
return &consumerFilter{}
}

func GetProviderFilter() filter.Filter {
return &providerFilter{}
}
50 changes: 50 additions & 0 deletions adapter/dubbo/provider_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package dubbo

import (
"context"
"github.com/apache/dubbo-go/protocol"
)
import (
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
)

type providerFilter struct{}

func (d *providerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
methodResourceName := getResourceName(invoker, invocation, getProviderPrefix())
interfaceResourceName := ""
if getInterfaceGroupAndVersionEnabled() {
interfaceResourceName = getColonSeparatedKey(invoker.GetUrl())
} else {
interfaceResourceName = invoker.GetUrl().Service()
}
var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)
interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Inbound))
if b != nil { // blocked
return providerDubboFallback(ctx, invoker, invocation, b)
}
methodEntry, b = sentinel.Entry(methodResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Inbound), sentinel.WithArgs(invocation.Attachments()))
if b != nil { // blocked
return providerDubboFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
return invoker.Invoke(ctx, invocation)
}

func (d *providerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
// TODO traceEntry()
methodEntry.(*base.SentinelEntry).Exit()
}
if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil {
// TODO traceEntry()
interfaceEntry.(*base.SentinelEntry).Exit()
}
return result
}
28 changes: 28 additions & 0 deletions adapter/dubbo/provider_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package dubbo

import (
"context"
"testing"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/stretchr/testify/assert"
)

func TestProviderFilter_Invoke(t *testing.T) {
f := GetProviderFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]string, 0))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
// todo more testing code
}
70 changes: 70 additions & 0 deletions adapter/dubbo/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package dubbo

import (
"bytes"
"fmt"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)

const (
ProviderFilterName = "sentinel-provider"
ConsumerFilterName = "sentinel-consumer"

DefaultProviderPrefix = "dubbo:provider:"
DefaultConsumerPrefix = "dubbo:consumer:"

MethodEntryKey = "dubboMethodEntry"
InterfaceEntryKey = "dubboInterfaceEntry"
)

// Currently, a ConcurrentHashMap mechanism is missing.
// All values are filled with default values first.

func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, prefix string) string {
var (
buf bytes.Buffer
interfaceResource string
)
buf.WriteString(prefix)
if getInterfaceGroupAndVersionEnabled() {
interfaceResource = getColonSeparatedKey(invoker.GetUrl())
} else {
interfaceResource = invoker.GetUrl().Service()
}
buf.WriteString(interfaceResource)
buf.WriteString(":")
buf.WriteString(invocation.MethodName())
buf.WriteString("(")
isFirst := true
for _, v := range invocation.ParameterTypes() {
if !isFirst {
buf.WriteString(",")
}
buf.WriteString(v.Name())
isFirst = false
}
buf.WriteString(")")
return buf.String()
}
func getConsumerPrefix() string {
return DefaultConsumerPrefix
}
func getProviderPrefix() string {
return DefaultProviderPrefix
}
func getInterfaceGroupAndVersionEnabled() bool {
return true
}
func getColonSeparatedKey(url common.URL) string {
return fmt.Sprintf("%s:%s:%s",
url.Service(),
url.GetParam(constant.GROUP_KEY, ""),
url.GetParam(constant.VERSION_KEY, ""))
}
func isAsync(invocation protocol.Invocation) bool {
return invocation.Attachments()["async"] == "true"
}
25 changes: 25 additions & 0 deletions adapter/dubbo/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package dubbo

import (
"testing"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/stretchr/testify/assert"
)

func TestGetResourceName(t *testing.T) {
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"version=1.0.0&group=myGroup&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
methodResourceName := getResourceName(mockInvoker,
invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]string, 0)), "prefix_")
assert.Equal(t, "prefix_com.ikurento.user.UserProvider:myGroup:1.0.0:hello()", methodResourceName)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/apache/dubbo-go v0.1.2-0.20200224151332-dd1a3c24d656
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/pkg/errors v0.8.1
github.com/shirou/gopsutil v2.19.12+incompatible
Expand Down
Loading