Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mod: update the comments in protocol directory #602

Merged
merged 5 commits into from
Jun 14, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func setClientGrpool() {
}
}

// Options ...
// Options is option for create dubbo client
type Options struct {
// connect timeout
ConnectTimeout time.Duration
Expand Down
1 change: 1 addition & 0 deletions protocol/dubbo/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type DubboPackage struct {
Err error
}

// String prints dubbo package detail include header、path、body etc.
func (p DubboPackage) String() string {
return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
}
Expand Down
12 changes: 5 additions & 7 deletions protocol/dubbo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type (
// GettySessionParam ...
// GettySessionParam is session configuration for getty.
GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
Expand All @@ -47,8 +47,7 @@ type (
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}

// ServerConfig
//Config holds supported types by the multiconfig package
// ServerConfig holds supported types by the multiconfig package
ServerConfig struct {
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
Expand All @@ -64,8 +63,7 @@ type (
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}

// ClientConfig
//Config holds supported types by the multiconfig package
// ClientConfig holds supported types by the multiconfig package
ClientConfig struct {
ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"`

Expand Down Expand Up @@ -94,7 +92,7 @@ type (
}
)

// GetDefaultClientConfig ...
// GetDefaultClientConfig gets client default configuration.
func GetDefaultClientConfig() ClientConfig {
return ClientConfig{
ReconnectInterval: 0,
Expand Down Expand Up @@ -122,7 +120,7 @@ func GetDefaultClientConfig() ClientConfig {
}}
}

// GetDefaultServerConfig ...
// GetDefaultServerConfig gets server default configuration.
func GetDefaultServerConfig() ServerConfig {
return ServerConfig{
SessionTimeout: "180s",
Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

// dubbo protocol constant
const (
// DUBBO ...
// DUBBO is dubbo protocol name
DUBBO = "dubbo"
)

Expand Down
37 changes: 19 additions & 18 deletions protocol/dubbo/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)

// todo: WritePkg_Timeout will entry *.yml
// todo: writePkg_Timeout will entry *.yml
const (
// WritePkg_Timeout ...
WritePkg_Timeout = 5 * time.Second
writePkg_Timeout = 5 * time.Second
)

var (
Expand All @@ -56,10 +55,12 @@ type rpcSession struct {
reqNum int32
}

// AddReqNum adds total request number safely
func (s *rpcSession) AddReqNum(num int32) {
atomic.AddInt32(&s.reqNum, num)
}

// GetReqNum gets total request number safely
func (s *rpcSession) GetReqNum() int32 {
return atomic.LoadInt32(&s.reqNum)
}
Expand All @@ -68,35 +69,35 @@ func (s *rpcSession) GetReqNum() int32 {
// RpcClientHandler
// //////////////////////////////////////////

// RpcClientHandler ...
// RpcClientHandler is handler of RPC Client
type RpcClientHandler struct {
conn *gettyRPCClient
}

// NewRpcClientHandler ...
// NewRpcClientHandler creates RpcClientHandler with @gettyRPCClient
func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client}
}

// OnOpen ...
// OnOpen notified when RPC client session opened
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.conn.addSession(session)
return nil
}

// OnError ...
// OnError notified when RPC client session got any error
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.conn.removeSession(session)
}

// OnClose ...
// OnOpen notified when RPC client session closed
func (h *RpcClientHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat())
h.conn.removeSession(session)
}

// OnMessage ...
// OnMessage notified when RPC client session got any message in connection
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*DubboPackage)
if !ok {
Expand Down Expand Up @@ -141,7 +142,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
}
}

// OnCron ...
// OnCron notified when RPC client session got any message in cron job
func (h *RpcClientHandler) OnCron(session getty.Session) {
clientRpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
Expand All @@ -163,15 +164,15 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
// RpcServerHandler
// //////////////////////////////////////////

// RpcServerHandler ...
// RpcServerHandler is handler of RPC Server
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
}

// NewRpcServerHandler ...
// NewRpcServerHandler creates RpcServerHandler with @maxSessionNum and @sessionTimeout
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
Expand All @@ -180,7 +181,7 @@ func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcSe
}
}

// OnOpen ...
// OnOpen notified when RPC server session opened
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
Expand All @@ -199,23 +200,23 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error {
return nil
}

// OnError ...
// OnError notified when RPC server session got any error
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}

// OnClose ...
// OnOpen notified when RPC server session closed
func (h *RpcServerHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}

// OnMessage ...
// OnMessage notified when RPC server session got any message in connection
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
Expand Down Expand Up @@ -306,7 +307,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
reply(session, p, hessian.PackageResponse)
}

// OnCron ...
// OnCron notified when RPC server session got any message in cron job
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
Expand Down Expand Up @@ -363,7 +364,7 @@ func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp.Body = nil
}

if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
if err := session.WritePkg(resp, writePkg_Timeout); err != nil {
logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header)
}
}
4 changes: 2 additions & 2 deletions protocol/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import (
"github.com/apache/dubbo-go/config"
)

// Client ...
// Client is gRPC client include client connection and invoker
type Client struct {
*grpc.ClientConn
invoker reflect.Value
}

// NewClient ...
// NewClient creates a new gRPC client.
func NewClient(url common.URL) *Client {
// if global trace instance was set , it means trace function enabled. If not , will return Nooptracer
tracer := opentracing.GlobalTracer()
Expand Down
4 changes: 2 additions & 2 deletions protocol/grpc/grpc_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ type GrpcExporter struct {
*protocol.BaseExporter
}

// NewGrpcExporter ...
// NewGrpcExporter creates a new gRPC exporter
func NewGrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *GrpcExporter {
return &GrpcExporter{
BaseExporter: protocol.NewBaseExporter(key, invoker, exporterMap),
}
}

// Unexport ...
// Unexport and unregister gRPC service from registry and memory.
func (gg *GrpcExporter) Unexport() {
serviceId := gg.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "")
interfaceName := gg.GetInvoker().GetUrl().GetParam(constant.INTERFACE_KEY, "")
Expand Down
5 changes: 2 additions & 3 deletions protocol/grpc/grpc_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ import (
"github.com/apache/dubbo-go/protocol"
)

// ErrNoReply ...
var ErrNoReply = errors.New("request need @response")
var errNoReply = errors.New("request need @response")

// GrpcInvoker ...
type GrpcInvoker struct {
Expand All @@ -60,7 +59,7 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
)

if invocation.Reply() == nil {
result.Err = ErrNoReply
result.Err = errNoReply
}

in := []reflect.Value{}
Expand Down
12 changes: 6 additions & 6 deletions protocol/grpc/grpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,22 @@ func init() {

var grpcProtocol *GrpcProtocol

// GrpcProtocol ...
// GrpcProtocol is gRPC protocol
type GrpcProtocol struct {
protocol.BaseProtocol
serverMap map[string]*Server
serverLock sync.Mutex
}

// NewGRPCProtocol ...
// NewGRPCProtocol creates new gRPC protocol
func NewGRPCProtocol() *GrpcProtocol {
return &GrpcProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
serverMap: make(map[string]*Server),
}
}

// Export ...
// Export gRPC service for remote invocation
func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl()
serviceKey := url.ServiceKey()
Expand Down Expand Up @@ -84,15 +84,15 @@ func (gp *GrpcProtocol) openServer(url common.URL) {
}
}

// Refer ...
// Refer a remote gRPC service
func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(url))
gp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
}

// Destroy ...
// Destroy will destroy gRPC all invoker and exporter, so it only is called once.
func (gp *GrpcProtocol) Destroy() {
logger.Infof("GrpcProtocol destroy.")

Expand All @@ -104,7 +104,7 @@ func (gp *GrpcProtocol) Destroy() {
}
}

// GetProtocol ...
// GetProtocol gets gRPC protocol , will create if null.
func GetProtocol() protocol.Protocol {
if grpcProtocol == nil {
grpcProtocol = NewGRPCProtocol()
Expand Down
4 changes: 2 additions & 2 deletions protocol/grpc/internal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *server) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, e
return &HelloReply{Message: "Hello " + in.GetName()}, nil
}

// InitGrpcServer ...
// InitGrpcServer creates global gRPC server.
func InitGrpcServer() {
port := ":30000"

Expand All @@ -57,7 +57,7 @@ func InitGrpcServer() {
}
}

// ShutdownGrpcServer ...
// ShutdownGrpcServer shuts down gRPC server gracefully
func ShutdownGrpcServer() {
if s == nil {
return
Expand Down
13 changes: 8 additions & 5 deletions protocol/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,27 @@ import (
"github.com/apache/dubbo-go/protocol"
)

// Server ...
// Server is a gRPC server
type Server struct {
grpcServer *grpc.Server
}

// NewServer ...
// NewServer creates a new server
func NewServer() *Server {
return &Server{}
}

// DubboGrpcService ...
// DubboGrpcService is gRPC service
type DubboGrpcService interface {
// SetProxyImpl sets proxy.
SetProxyImpl(impl protocol.Invoker)
// GetProxyImpl gets proxy.
GetProxyImpl() protocol.Invoker
// ServiceDesc gets an RPC service's specification.
ServiceDesc() *grpc.ServiceDesc
}

// Start ...
// Start gRPC server with @url
func (s *Server) Start(url common.URL) {
var (
addr string
Expand Down Expand Up @@ -106,7 +109,7 @@ func (s *Server) Start(url common.URL) {
}()
}

// Stop ...
// Stop gRPC server
func (s *Server) Stop() {
s.grpcServer.Stop()
}
Loading