diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index e6ffa64d80..6d1b771bf4 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -111,7 +111,7 @@ func setClientGrpool() { } } -// Options ... +// Options is option for create dubbo client type Options struct { // connect timeout ConnectTimeout time.Duration diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 620a57d4a7..1f7d107544 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -65,6 +65,7 @@ type DubboPackage struct { Err error } +// String prints dubbo package detail include header、path、body etc. func (p DubboPackage) String() string { return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) } diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index 6a1daf857a..635d12109a 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -27,7 +27,7 @@ import ( ) type ( - // GettySessionParam ... + // GettySessionParam is session configuration for getty. GettySessionParam struct { CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` @@ -47,8 +47,7 @@ type ( SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"` } - // ServerConfig - //Config holds supported types by the multiconfig package + // ServerConfig holds supported types by the multiconfig package ServerConfig struct { // session SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` @@ -64,8 +63,7 @@ type ( GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` } - // ClientConfig - //Config holds supported types by the multiconfig package + // ClientConfig holds supported types by the multiconfig package ClientConfig struct { ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"` @@ -94,7 +92,7 @@ type ( } ) -// GetDefaultClientConfig ... +// GetDefaultClientConfig gets client default configuration. func GetDefaultClientConfig() ClientConfig { return ClientConfig{ ReconnectInterval: 0, @@ -122,7 +120,7 @@ func GetDefaultClientConfig() ClientConfig { }} } -// GetDefaultServerConfig ... +// GetDefaultServerConfig gets server default configuration. func GetDefaultServerConfig() ServerConfig { return ServerConfig{ SessionTimeout: "180s", diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index b7d0a8a268..9eeefd0792 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -33,7 +33,7 @@ import ( // dubbo protocol constant const ( - // DUBBO ... + // DUBBO is dubbo protocol name DUBBO = "dubbo" ) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index f57d89d1a7..4834459390 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -41,10 +41,9 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -// todo: WritePkg_Timeout will entry *.yml +// todo: writePkg_Timeout will entry *.yml const ( - // WritePkg_Timeout ... - WritePkg_Timeout = 5 * time.Second + writePkg_Timeout = 5 * time.Second ) var ( @@ -56,10 +55,12 @@ type rpcSession struct { reqNum int32 } +// AddReqNum adds total request number safely func (s *rpcSession) AddReqNum(num int32) { atomic.AddInt32(&s.reqNum, num) } +// GetReqNum gets total request number safely func (s *rpcSession) GetReqNum() int32 { return atomic.LoadInt32(&s.reqNum) } @@ -68,35 +69,35 @@ func (s *rpcSession) GetReqNum() int32 { // RpcClientHandler // ////////////////////////////////////////// -// RpcClientHandler ... +// RpcClientHandler is handler of RPC Client type RpcClientHandler struct { conn *gettyRPCClient } -// NewRpcClientHandler ... +// NewRpcClientHandler creates RpcClientHandler with @gettyRPCClient func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler { return &RpcClientHandler{conn: client} } -// OnOpen ... +// OnOpen notified when RPC client session opened func (h *RpcClientHandler) OnOpen(session getty.Session) error { h.conn.addSession(session) return nil } -// OnError ... +// OnError notified when RPC client session got any error func (h *RpcClientHandler) OnError(session getty.Session, err error) { logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err) h.conn.removeSession(session) } -// OnClose ... +// OnOpen notified when RPC client session closed func (h *RpcClientHandler) OnClose(session getty.Session) { logger.Infof("session{%s} is closing......", session.Stat()) h.conn.removeSession(session) } -// OnMessage ... +// OnMessage notified when RPC client session got any message in connection func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { p, ok := pkg.(*DubboPackage) if !ok { @@ -141,7 +142,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { } } -// OnCron ... +// OnCron notified when RPC client session got any message in cron job func (h *RpcClientHandler) OnCron(session getty.Session) { clientRpcSession, err := h.conn.getClientRpcSession(session) if err != nil { @@ -163,7 +164,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { // RpcServerHandler // ////////////////////////////////////////// -// RpcServerHandler ... +// RpcServerHandler is handler of RPC Server type RpcServerHandler struct { maxSessionNum int sessionTimeout time.Duration @@ -171,7 +172,7 @@ type RpcServerHandler struct { rwlock sync.RWMutex } -// NewRpcServerHandler ... +// NewRpcServerHandler creates RpcServerHandler with @maxSessionNum and @sessionTimeout func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { return &RpcServerHandler{ maxSessionNum: maxSessionNum, @@ -180,7 +181,7 @@ func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcSe } } -// OnOpen ... +// OnOpen notified when RPC server session opened func (h *RpcServerHandler) OnOpen(session getty.Session) error { var err error h.rwlock.RLock() @@ -199,7 +200,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error { return nil } -// OnError ... +// OnError notified when RPC server session got any error func (h *RpcServerHandler) OnError(session getty.Session, err error) { logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err) h.rwlock.Lock() @@ -207,7 +208,7 @@ func (h *RpcServerHandler) OnError(session getty.Session, err error) { h.rwlock.Unlock() } -// OnClose ... +// OnOpen notified when RPC server session closed func (h *RpcServerHandler) OnClose(session getty.Session) { logger.Infof("session{%s} is closing......", session.Stat()) h.rwlock.Lock() @@ -215,7 +216,7 @@ func (h *RpcServerHandler) OnClose(session getty.Session) { h.rwlock.Unlock() } -// OnMessage ... +// OnMessage notified when RPC server session got any message in connection func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { h.rwlock.Lock() if _, ok := h.sessionMap[session]; ok { @@ -306,7 +307,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { reply(session, p, hessian.PackageResponse) } -// OnCron ... +// OnCron notified when RPC server session got any message in cron job func (h *RpcServerHandler) OnCron(session getty.Session) { var ( flag bool @@ -363,7 +364,7 @@ func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp.Body = nil } - if err := session.WritePkg(resp, WritePkg_Timeout); err != nil { + if err := session.WritePkg(resp, writePkg_Timeout); err != nil { logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header) } } diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go index 97cd2216db..a0ab0be80c 100644 --- a/protocol/grpc/client.go +++ b/protocol/grpc/client.go @@ -82,13 +82,13 @@ func init() { } -// Client return grpc connection and warp service stub +// Client is gRPC client include client connection and invoker type Client struct { *grpc.ClientConn invoker reflect.Value } -// NewClient ... +// NewClient creates a new gRPC client. func NewClient(url common.URL) *Client { // if global trace instance was set , it means trace function enabled. If not , will return Nooptracer tracer := opentracing.GlobalTracer() diff --git a/protocol/grpc/grpc_exporter.go b/protocol/grpc/grpc_exporter.go index 0296ad8b5b..79962b59e2 100644 --- a/protocol/grpc/grpc_exporter.go +++ b/protocol/grpc/grpc_exporter.go @@ -33,14 +33,14 @@ type GrpcExporter struct { *protocol.BaseExporter } -// NewGrpcExporter ... +// NewGrpcExporter creates a new gRPC exporter func NewGrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *GrpcExporter { return &GrpcExporter{ BaseExporter: protocol.NewBaseExporter(key, invoker, exporterMap), } } -// Unexport ... +// Unexport and unregister gRPC service from registry and memory. func (gg *GrpcExporter) Unexport() { serviceId := gg.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") interfaceName := gg.GetInvoker().GetUrl().GetParam(constant.INTERFACE_KEY, "") diff --git a/protocol/grpc/grpc_invoker.go b/protocol/grpc/grpc_invoker.go index 78439fcd9b..e150d05e6f 100644 --- a/protocol/grpc/grpc_invoker.go +++ b/protocol/grpc/grpc_invoker.go @@ -35,8 +35,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// ErrNoReply ... -var ErrNoReply = errors.New("request need @response") +var errNoReply = errors.New("request need @response") // GrpcInvoker ... type GrpcInvoker struct { @@ -60,7 +59,7 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio ) if invocation.Reply() == nil { - result.Err = ErrNoReply + result.Err = errNoReply } in := []reflect.Value{} diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go index cc4aba10bf..68594a4b35 100644 --- a/protocol/grpc/grpc_protocol.go +++ b/protocol/grpc/grpc_protocol.go @@ -39,14 +39,14 @@ func init() { var grpcProtocol *GrpcProtocol -// GrpcProtocol ... +// GrpcProtocol is gRPC protocol type GrpcProtocol struct { protocol.BaseProtocol serverMap map[string]*Server serverLock sync.Mutex } -// NewGRPCProtocol ... +// NewGRPCProtocol creates new gRPC protocol func NewGRPCProtocol() *GrpcProtocol { return &GrpcProtocol{ BaseProtocol: protocol.NewBaseProtocol(), @@ -54,7 +54,7 @@ func NewGRPCProtocol() *GrpcProtocol { } } -// Export ... +// Export gRPC service for remote invocation func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() serviceKey := url.ServiceKey() @@ -84,7 +84,7 @@ func (gp *GrpcProtocol) openServer(url common.URL) { } } -// Refer ... +// Refer a remote gRPC service func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker { invoker := NewGrpcInvoker(url, NewClient(url)) gp.SetInvokers(invoker) @@ -92,7 +92,7 @@ func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker { return invoker } -// Destroy ... +// Destroy will destroy gRPC all invoker and exporter, so it only is called once. func (gp *GrpcProtocol) Destroy() { logger.Infof("GrpcProtocol destroy.") @@ -104,7 +104,7 @@ func (gp *GrpcProtocol) Destroy() { } } -// GetProtocol ... +// GetProtocol gets gRPC protocol , will create if null. func GetProtocol() protocol.Protocol { if grpcProtocol == nil { grpcProtocol = NewGRPCProtocol() diff --git a/protocol/grpc/internal/server.go b/protocol/grpc/internal/server.go index a6b861cac6..f7b99fa0ba 100644 --- a/protocol/grpc/internal/server.go +++ b/protocol/grpc/internal/server.go @@ -42,7 +42,7 @@ func (s *server) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, e return &HelloReply{Message: "Hello " + in.GetName()}, nil } -// InitGrpcServer ... +// InitGrpcServer creates global gRPC server. func InitGrpcServer() { port := ":30000" @@ -57,7 +57,7 @@ func InitGrpcServer() { } } -// ShutdownGrpcServer ... +// ShutdownGrpcServer shuts down gRPC server gracefully func ShutdownGrpcServer() { if s == nil { return diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go index 63783040f9..4017b65dd5 100644 --- a/protocol/grpc/server.go +++ b/protocol/grpc/server.go @@ -37,24 +37,27 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// Server ... +// Server is a gRPC server type Server struct { grpcServer *grpc.Server } -// NewServer ... +// NewServer creates a new server func NewServer() *Server { return &Server{} } -// DubboGrpcService ... +// DubboGrpcService is gRPC service type DubboGrpcService interface { + // SetProxyImpl sets proxy. SetProxyImpl(impl protocol.Invoker) + // GetProxyImpl gets proxy. GetProxyImpl() protocol.Invoker + // ServiceDesc gets an RPC service's specification. ServiceDesc() *grpc.ServiceDesc } -// Start ... +// Start gRPC server with @url func (s *Server) Start(url common.URL) { var ( addr string @@ -106,7 +109,7 @@ func (s *Server) Start(url common.URL) { }() } -// Stop ... +// Stop gRPC server func (s *Server) Stop() { s.grpcServer.Stop() } diff --git a/protocol/invocation.go b/protocol/invocation.go index eedf5f0253..ba5949794c 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -21,17 +21,26 @@ import ( "reflect" ) -// Invocation ... +// Invocation is a invocation for each remote method. type Invocation interface { + // MethodName gets invocation method name. MethodName() string + // ParameterTypes gets invocation parameter types. ParameterTypes() []reflect.Type + // ParameterValues gets invocation parameter values. ParameterValues() []reflect.Value + // Arguments gets arguments. Arguments() []interface{} + // Reply gets response of request Reply() interface{} + // Attachments gets all attachments Attachments() map[string]string + // AttachmentsByKey gets attachment by key , if nil then return default value AttachmentsByKey(string, string) string - // Refer to dubbo 2.7.6. It is different from attachment. It is used in internal process. + // Attributes refers to dubbo 2.7.6. It is different from attachment. It is used in internal process. Attributes() map[string]interface{} + // AttributeByKey gets attribute by key , if nil then return default value AttributeByKey(string, interface{}) interface{} + // Invoker gets the invoker in current context. Invoker() Invoker } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 68fe7b9204..7f2cede929 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -31,7 +31,7 @@ import ( // /////////////////////////// // todo: is it necessary to separate fields of consumer(provider) from RPCInvocation -// RPCInvocation ... +// nolint type RPCInvocation struct { methodName string parameterTypes []reflect.Type @@ -46,7 +46,7 @@ type RPCInvocation struct { lock sync.RWMutex } -// NewRPCInvocation ... +// NewRPCInvocation creates a RPC invocation. func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation { return &RPCInvocation{ methodName: methodName, @@ -56,7 +56,7 @@ func NewRPCInvocation(methodName string, arguments []interface{}, attachments ma } } -// NewRPCInvocationWithOptions ... +// NewRPCInvocationWithOptions creates a RPC invocation with @opts. func NewRPCInvocationWithOptions(opts ...option) *RPCInvocation { invo := &RPCInvocation{} for _, opt := range opts { @@ -68,42 +68,42 @@ func NewRPCInvocationWithOptions(opts ...option) *RPCInvocation { return invo } -// MethodName ... +// MethodName gets RPC invocation method name. func (r *RPCInvocation) MethodName() string { return r.methodName } -// ParameterTypes ... +// ParameterTypes gets RPC invocation parameter types. func (r *RPCInvocation) ParameterTypes() []reflect.Type { return r.parameterTypes } -// ParameterValues ... +// ParameterValues gets RPC invocation parameter values. func (r *RPCInvocation) ParameterValues() []reflect.Value { return r.parameterValues } -// Arguments ... +// Arguments gets RPC arguments. func (r *RPCInvocation) Arguments() []interface{} { return r.arguments } -// Reply ... +// Reply gets response of RPC request. func (r *RPCInvocation) Reply() interface{} { return r.reply } -// SetReply ... +// SetReply sets response of RPC request. func (r *RPCInvocation) SetReply(reply interface{}) { r.reply = reply } -// Attachments ... +// Attachments gets all attachments of RPC. func (r *RPCInvocation) Attachments() map[string]string { return r.attachments } -// AttachmentsByKey ... +// AttachmentsByKey gets RPC attachment by key , if nil then return default value. func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string { r.lock.RLock() defer r.lock.RUnlock() @@ -117,12 +117,12 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string return defaultValue } -// get attributes +// Attributes gets all attributes of RPC. func (r *RPCInvocation) Attributes() map[string]interface{} { return r.attributes } -// get attribute by key. If it is not exist, it will return default value +// AttributeByKey gets attribute by @key. If it is not exist, it will return default value. func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) interface{} { r.lock.RLock() defer r.lock.RUnlock() @@ -133,7 +133,7 @@ func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) int return defaultValue } -// SetAttachments ... +// SetAttachments sets attribute by @key and @value. func (r *RPCInvocation) SetAttachments(key string, value string) { r.lock.Lock() defer r.lock.Unlock() @@ -143,29 +143,24 @@ func (r *RPCInvocation) SetAttachments(key string, value string) { r.attachments[key] = value } -// SetAttribute. If Attributes is nil, it will be inited. +// SetAttribute sets attribute by @key and @value. func (r *RPCInvocation) SetAttribute(key string, value interface{}) { r.lock.Lock() defer r.lock.Unlock() r.attributes[key] = value } -// Invoker ... +// Invoker gets the invoker in current context. func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } -// SetInvoker ... -func (r *RPCInvocation) SetInvoker() protocol.Invoker { - return r.invoker -} - -// CallBack ... +// CallBack sets RPC callback method. func (r *RPCInvocation) CallBack() interface{} { return r.callBack } -// SetCallBack ... +// SetCallBack sets RPC callback method. func (r *RPCInvocation) SetCallBack(c interface{}) { r.callBack = c } @@ -176,56 +171,56 @@ func (r *RPCInvocation) SetCallBack(c interface{}) { type option func(invo *RPCInvocation) -// WithMethodName ... +// WithMethodName creates option with @methodName. func WithMethodName(methodName string) option { return func(invo *RPCInvocation) { invo.methodName = methodName } } -// WithParameterTypes ... +// WithParameterTypes creates option with @parameterTypes. func WithParameterTypes(parameterTypes []reflect.Type) option { return func(invo *RPCInvocation) { invo.parameterTypes = parameterTypes } } -// WithParameterValues ... +// WithParameterValues creates option with @parameterValues func WithParameterValues(parameterValues []reflect.Value) option { return func(invo *RPCInvocation) { invo.parameterValues = parameterValues } } -// WithArguments ... +// WithArguments creates option with @arguments function. func WithArguments(arguments []interface{}) option { return func(invo *RPCInvocation) { invo.arguments = arguments } } -// WithReply ... +// WithReply creates option with @reply function. func WithReply(reply interface{}) option { return func(invo *RPCInvocation) { invo.reply = reply } } -// WithCallBack ... +// WithCallBack creates option with @callback function. func WithCallBack(callBack interface{}) option { return func(invo *RPCInvocation) { invo.callBack = callBack } } -// WithAttachments ... +// WithAttachments creates option with @attachments. func WithAttachments(attachments map[string]string) option { return func(invo *RPCInvocation) { invo.attachments = attachments } } -// WithInvoker ... +// WithInvoker creates option with @invoker. func WithInvoker(invoker protocol.Invoker) option { return func(invo *RPCInvocation) { invo.invoker = invoker diff --git a/protocol/invoker.go b/protocol/invoker.go index bb71bab1cf..3ca370479c 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -31,6 +31,7 @@ import ( // Extension - Invoker type Invoker interface { common.Node + // Invoke the invocation and return result. Invoke(context.Context, Invocation) Result } @@ -38,14 +39,14 @@ type Invoker interface { // base invoker ///////////////////////////// -// BaseInvoker ... +// BaseInvoker provides default invoker implement type BaseInvoker struct { url common.URL available bool destroyed bool } -// NewBaseInvoker ... +// NewBaseInvoker creates a new BaseInvoker func NewBaseInvoker(url common.URL) *BaseInvoker { return &BaseInvoker{ url: url, @@ -54,27 +55,27 @@ func NewBaseInvoker(url common.URL) *BaseInvoker { } } -// GetUrl ... +// GetUrl gets base invoker URL func (bi *BaseInvoker) GetUrl() common.URL { return bi.url } -// IsAvailable ... +// IsAvailable gets available flag func (bi *BaseInvoker) IsAvailable() bool { return bi.available } -// IsDestroyed ... +// IsDestroyed gets destroyed flag func (bi *BaseInvoker) IsDestroyed() bool { return bi.destroyed } -// Invoke ... +// Invoke provides default invoker implement func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result { return &RPCResult{} } -// Destroy ... +// Destroy changes available and destroyed flag func (bi *BaseInvoker) Destroy() { logger.Infof("Destroy invoker: %s", bi.GetUrl().String()) bi.destroyed = true diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 70b3abd24f..5fca66d993 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -63,7 +63,7 @@ type Request struct { // HTTP Client // //////////////////////////////////////////// -// HTTPOptions ... +// HTTPOptions is a HTTP option include HandshakeTimeout and HTTPTimeout. type HTTPOptions struct { HandshakeTimeout time.Duration HTTPTimeout time.Duration @@ -74,13 +74,13 @@ var defaultHTTPOptions = HTTPOptions{ HTTPTimeout: 3 * time.Second, } -// HTTPClient ... +// HTTPClient is a HTTP client ,include ID and options. type HTTPClient struct { ID int64 options HTTPOptions } -// NewHTTPClient ... +// NewHTTPClient creates a new HTTP client with HTTPOptions. func NewHTTPClient(opt *HTTPOptions) *HTTPClient { if opt == nil { opt = &defaultHTTPOptions @@ -100,7 +100,7 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } -// NewRequest ... +// NewRequest creates a new HTTP request with @service ,@method and @arguments. func (c *HTTPClient) NewRequest(service common.URL, method string, args interface{}) *Request { return &Request{ @@ -114,7 +114,7 @@ func (c *HTTPClient) NewRequest(service common.URL, method string, args interfac } } -// Call ... +// Call makes a HTTP call with @ctx , @service ,@req and @rsp func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, rsp interface{}) error { // header httpHeader := http.Header{} diff --git a/protocol/jsonrpc/json.go b/protocol/jsonrpc/json.go index 3176e19381..389ead9c1a 100644 --- a/protocol/jsonrpc/json.go +++ b/protocol/jsonrpc/json.go @@ -37,7 +37,7 @@ const ( VERSION = "2.0" ) -// CodecData ... +// CodecData is codec data for json RPC. type CodecData struct { ID int64 Method string @@ -64,6 +64,7 @@ type Error struct { Data interface{} `json:"data,omitempty"` } +// Error decodes response error for a string. func (e *Error) Error() string { buf, err := json.Marshal(e) if err != nil { @@ -114,6 +115,7 @@ func newJsonClientCodec() *jsonClientCodec { } } +// Write codec data as byte. func (c *jsonClientCodec) Write(d *CodecData) ([]byte, error) { // If return error: it will be returned as is for this call. // Allow param to be only Array, Slice, Map or Struct. @@ -170,6 +172,7 @@ func (c *jsonClientCodec) Write(d *CodecData) ([]byte, error) { return buf.Bytes(), nil } +// Read bytes as structured data func (c *jsonClientCodec) Read(streamBytes []byte, x interface{}) error { c.rsp.reset() @@ -223,6 +226,7 @@ func (r *serverRequest) reset() { } } +// UnmarshalJSON unmarshals JSON for server request. func (r *serverRequest) UnmarshalJSON(raw []byte) error { r.reset() @@ -281,7 +285,7 @@ type serverResponse struct { Error interface{} `json:"error,omitempty"` } -// ServerCodec ... +// ServerCodec is codec data for request server. type ServerCodec struct { req serverRequest } @@ -296,7 +300,7 @@ func newServerCodec() *ServerCodec { return &ServerCodec{} } -// ReadHeader ... +// ReadHeader reads header and unmarshal to server codec func (c *ServerCodec) ReadHeader(header map[string]string, body []byte) error { if header["HttpMethod"] != "POST" { return &Error{Code: -32601, Message: "Method not found"} @@ -328,7 +332,7 @@ func (c *ServerCodec) ReadHeader(header map[string]string, body []byte) error { return nil } -// ReadBody ... +// ReadBody reads @x as request body. func (c *ServerCodec) ReadBody(x interface{}) error { // If x!=nil and return error e: // - Write() will be called with e.Error() in r.Error @@ -339,7 +343,7 @@ func (c *ServerCodec) ReadBody(x interface{}) error { return nil } - // 在这里把请求参数json 字符串转换成了相应的struct + // the request parameter JSON string is converted to the corresponding struct params := []byte(*c.req.Params) if err := json.Unmarshal(*c.req.Params, x); err != nil { // Note: if c.request.Params is nil it's not an error, it's an optional member. @@ -362,7 +366,7 @@ func (c *ServerCodec) ReadBody(x interface{}) error { return nil } -// NewError ... +// NewError creates a error with @code and @message func NewError(code int, message string) *Error { return &Error{Code: code, Message: message} } @@ -380,6 +384,7 @@ func newError(message string) *Error { } } +// Write responses as byte func (c *ServerCodec) Write(errMsg string, x interface{}) ([]byte, error) { // If return error: nothing happens. // In r.Error will be "" or .Error() of error returned by: diff --git a/protocol/jsonrpc/jsonrpc_exporter.go b/protocol/jsonrpc/jsonrpc_exporter.go index c61cf9adae..6f75f6aeae 100644 --- a/protocol/jsonrpc/jsonrpc_exporter.go +++ b/protocol/jsonrpc/jsonrpc_exporter.go @@ -28,19 +28,19 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// JsonrpcExporter ... +// JsonrpcExporter is JSON RPC exporter and extends from base invoker. type JsonrpcExporter struct { protocol.BaseExporter } -// NewJsonrpcExporter ... +// NewJsonrpcExporter creates JSON RPC exporter with @key, @invoker and @exporterMap func NewJsonrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *JsonrpcExporter { return &JsonrpcExporter{ BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), } } -// Unexport ... +// Unexport exported JSON RPC service. func (je *JsonrpcExporter) Unexport() { serviceId := je.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") interfaceName := je.GetInvoker().GetUrl().GetParam(constant.INTERFACE_KEY, "") diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index b6e194ce0e..d84b980216 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -29,13 +29,13 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) -// JsonrpcInvoker ... +// JsonrpcInvoker is JSON RPC invoker type JsonrpcInvoker struct { protocol.BaseInvoker client *HTTPClient } -// NewJsonrpcInvoker ... +// NewJsonrpcInvoker creates JSON RPC invoker with @url and @client func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker { return &JsonrpcInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), @@ -43,7 +43,7 @@ func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker { } } -// Invoke ... +// Invoke the JSON RPC invocation and return result. func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index 64f708652d..90a6bf5ef7 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -44,14 +44,14 @@ func init() { var jsonrpcProtocol *JsonrpcProtocol -// JsonrpcProtocol ... +// JsonrpcProtocol is JSON RPC protocol. type JsonrpcProtocol struct { protocol.BaseProtocol serverMap map[string]*Server serverLock sync.Mutex } -// NewJsonrpcProtocol ... +// NewJsonrpcProtocol creates JSON RPC protocol func NewJsonrpcProtocol() *JsonrpcProtocol { return &JsonrpcProtocol{ BaseProtocol: protocol.NewBaseProtocol(), @@ -59,7 +59,7 @@ func NewJsonrpcProtocol() *JsonrpcProtocol { } } -// Export ... +// Export JSON RPC service for remote invocation func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() serviceKey := strings.TrimPrefix(url.Path, "/") @@ -74,7 +74,7 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -// Refer ... +// Refer a remote JSON PRC service from registry func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { //default requestTimeout var requestTimeout = config.GetConsumerConfig().RequestTimeout @@ -93,7 +93,7 @@ func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { return invoker } -// Destroy ... +// Destroy will destroy all invoker and exporter, so it only is called once. func (jp *JsonrpcProtocol) Destroy() { logger.Infof("jsonrpcProtocol destroy.") @@ -125,7 +125,7 @@ func (jp *JsonrpcProtocol) openServer(url common.URL) { } } -// GetProtocol ... +// GetProtocol gets JSON RPC protocol. func GetProtocol() protocol.Protocol { if jsonrpcProtocol == nil { jsonrpcProtocol = NewJsonrpcProtocol() diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index fcea66632e..aa458a1614 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -59,7 +59,7 @@ const ( PathPrefix = byte('/') ) -// Server ... +// Server is JSON RPC server wrapper type Server struct { done chan struct{} once sync.Once @@ -69,7 +69,7 @@ type Server struct { timeout time.Duration } -// NewServer ... +// NewServer creates new JSON RPC server. func NewServer() *Server { return &Server{ done: make(chan struct{}), @@ -228,7 +228,7 @@ func accept(listener net.Listener, fn func(net.Conn)) error { } } -// Start ... +// Start JSON RPC server then ready for accept request. func (s *Server) Start(url common.URL) { listener, err := net.Listen("tcp", url.Location) if err != nil { @@ -255,7 +255,7 @@ func (s *Server) Start(url common.URL) { }() } -// Stop ... +// Stop JSON RPC server, just can be call once. func (s *Server) Stop() { s.once.Do(func() { close(s.done) diff --git a/protocol/protocol.go b/protocol/protocol.go index a873469a8b..6bed5ec4c2 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -29,15 +29,20 @@ import ( // Protocol // Extension - protocol type Protocol interface { + // Export service for remote invocation Export(invoker Invoker) Exporter + // Refer a remote service Refer(url common.URL) Invoker + // Destroy will destroy all invoker and exporter, so it only is called once. Destroy() } // Exporter // wrapping invoker type Exporter interface { + // GetInvoker gets invoker. GetInvoker() Invoker + // Unexport exported service. Unexport() } @@ -45,45 +50,45 @@ type Exporter interface { // base protocol ///////////////////////////// -// BaseProtocol ... +// BaseProtocol is default protocol implement. type BaseProtocol struct { exporterMap *sync.Map invokers []Invoker } -// NewBaseProtocol ... +// NewBaseProtocol creates a new BaseProtocol func NewBaseProtocol() BaseProtocol { return BaseProtocol{ exporterMap: new(sync.Map), } } -// SetExporterMap ... +// SetExporterMap set @exporter with @key to local memory. func (bp *BaseProtocol) SetExporterMap(key string, exporter Exporter) { bp.exporterMap.Store(key, exporter) } -// ExporterMap ... +// ExporterMap gets exporter map. func (bp *BaseProtocol) ExporterMap() *sync.Map { return bp.exporterMap } -// SetInvokers ... +// SetInvokers sets invoker into local memory func (bp *BaseProtocol) SetInvokers(invoker Invoker) { bp.invokers = append(bp.invokers, invoker) } -// Invokers ... +// Invokers gets all invokers func (bp *BaseProtocol) Invokers() []Invoker { return bp.invokers } -// Export ... +// Export is default export implement. func (bp *BaseProtocol) Export(invoker Invoker) Exporter { return NewBaseExporter("base", invoker, bp.exporterMap) } -// Refer ... +// Refer is default refer implement. func (bp *BaseProtocol) Refer(url common.URL) Invoker { return NewBaseInvoker(url) } @@ -113,14 +118,14 @@ func (bp *BaseProtocol) Destroy() { // base exporter ///////////////////////////// -// BaseExporter ... +// BaseExporter is default exporter implement. type BaseExporter struct { key string invoker Invoker exporterMap *sync.Map } -// NewBaseExporter ... +// NewBaseExporter creates a new BaseExporter func NewBaseExporter(key string, invoker Invoker, exporterMap *sync.Map) *BaseExporter { return &BaseExporter{ key: key, @@ -129,13 +134,13 @@ func NewBaseExporter(key string, invoker Invoker, exporterMap *sync.Map) *BaseEx } } -// GetInvoker ... +// GetInvoker gets invoker func (de *BaseExporter) GetInvoker() Invoker { return de.invoker } -// Unexport ... +// Unexport exported service. func (de *BaseExporter) Unexport() { logger.Infof("Exporter unexport.") de.invoker.Destroy() diff --git a/protocol/protocolwrapper/mock_protocol_filter.go b/protocol/protocolwrapper/mock_protocol_filter.go index dedf8aa64b..8763c20410 100644 --- a/protocol/protocolwrapper/mock_protocol_filter.go +++ b/protocol/protocolwrapper/mock_protocol_filter.go @@ -28,19 +28,22 @@ import ( type mockProtocolFilter struct{} -// NewMockProtocolFilter ... +// NewMockProtocolFilter creates a new mock protocol func NewMockProtocolFilter() protocol.Protocol { return &mockProtocolFilter{} } +// Export mock service for remote invocation func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporter { return protocol.NewBaseExporter("key", invoker, &sync.Map{}) } +// Refer a mock remote service func (pfw *mockProtocolFilter) Refer(url common.URL) protocol.Invoker { return protocol.NewBaseInvoker(url) } +// Destroy will do nothing func (pfw *mockProtocolFilter) Destroy() { } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 70d2da0fae..cba1d5d5bc 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -31,7 +31,7 @@ import ( ) const ( - // FILTER ... + // FILTER is protocol key. FILTER = "filter" ) @@ -45,7 +45,7 @@ type ProtocolFilterWrapper struct { protocol protocol.Protocol } -// Export ... +// Export service for remote invocation func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter { if pfw.protocol == nil { pfw.protocol = extension.GetProtocol(invoker.GetUrl().Protocol) @@ -54,7 +54,7 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo return pfw.protocol.Export(invoker) } -// Refer ... +// Refer a remote service func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { if pfw.protocol == nil { pfw.protocol = extension.GetProtocol(url.Protocol) @@ -62,7 +62,7 @@ func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) } -// Destroy ... +// Destroy will destroy all invoker and exporter. func (pfw *ProtocolFilterWrapper) Destroy() { pfw.protocol.Destroy() } diff --git a/protocol/result.go b/protocol/result.go index 34e76d2ddd..2e7a6e492a 100644 --- a/protocol/result.go +++ b/protocol/result.go @@ -17,15 +17,23 @@ package protocol -// Result ... +// Result is a RPC result type Result interface { + // SetError sets error. SetError(error) + // Error gets error. Error() error + // SetResult sets invoker result. SetResult(interface{}) + // Result gets invoker result. Result() interface{} + // SetAttachments replaces the existing attachments with the specified param. SetAttachments(map[string]string) + // Attachments gets all attachments Attachments() map[string]string + // AddAttachment adds the specified map to existing attachments in this instance. AddAttachment(string, string) + // Attachment gets attachment by key with default value. Attachment(string, string) string } @@ -33,48 +41,49 @@ type Result interface { // Result Impletment of RPC ///////////////////////////// -// RPCResult ... +// RPCResult is default RPC result. type RPCResult struct { Attrs map[string]string Err error Rest interface{} } -// SetError ... +// SetError sets error. func (r *RPCResult) SetError(err error) { r.Err = err } +// Error gets error. func (r *RPCResult) Error() error { return r.Err } -// SetResult ... +// SetResult sets invoker result. func (r *RPCResult) SetResult(rest interface{}) { r.Rest = rest } -// Result ... +// Result gets invoker result. func (r *RPCResult) Result() interface{} { return r.Rest } -// SetAttachments ... +// SetAttachments replaces the existing attachments with the specified param. func (r *RPCResult) SetAttachments(attr map[string]string) { r.Attrs = attr } -// Attachments ... +// Attachments gets all attachments func (r *RPCResult) Attachments() map[string]string { return r.Attrs } -// AddAttachment ... +// AddAttachment adds the specified map to existing attachments in this instance. func (r *RPCResult) AddAttachment(key, value string) { r.Attrs[key] = value } -// Attachment ... +// Attachment gets attachment by key with default value. func (r *RPCResult) Attachment(key, defaultValue string) string { v, ok := r.Attrs[key] if !ok { diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 13be47c98e..0eeca6c1bb 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -32,7 +32,7 @@ var ( serviceStatistic sync.Map // url -> RPCStatus ) -// RPCStatus ... +// RPCStatus is URL statistics. type RPCStatus struct { active int32 failed int32 @@ -46,63 +46,63 @@ type RPCStatus struct { lastRequestFailedTimestamp int64 } -// GetActive ... +// GetActive gets active. func (rpc *RPCStatus) GetActive() int32 { return atomic.LoadInt32(&rpc.active) } -// GetFailed ... +// GetFailed gets failed. func (rpc *RPCStatus) GetFailed() int32 { return atomic.LoadInt32(&rpc.failed) } -// GetTotal ... +// GetTotal gets total. func (rpc *RPCStatus) GetTotal() int32 { return atomic.LoadInt32(&rpc.total) } -// GetTotalElapsed ... +// GetTotalElapsed gets total elapsed. func (rpc *RPCStatus) GetTotalElapsed() int64 { return atomic.LoadInt64(&rpc.totalElapsed) } -// GetFailedElapsed ... +// GetFailedElapsed gets failed elapsed. func (rpc *RPCStatus) GetFailedElapsed() int64 { return atomic.LoadInt64(&rpc.failedElapsed) } -// GetMaxElapsed ... +// GetMaxElapsed gets max elapsed. func (rpc *RPCStatus) GetMaxElapsed() int64 { return atomic.LoadInt64(&rpc.maxElapsed) } -// GetFailedMaxElapsed ... +// GetFailedMaxElapsed gets failed max elapsed. func (rpc *RPCStatus) GetFailedMaxElapsed() int64 { return atomic.LoadInt64(&rpc.failedMaxElapsed) } -// GetSucceededMaxElapsed ... +// GetSucceededMaxElapsed gets succeeded max elapsed. func (rpc *RPCStatus) GetSucceededMaxElapsed() int64 { return atomic.LoadInt64(&rpc.succeededMaxElapsed) } -// GetLastRequestFailedTimestamp ... +// GetLastRequestFailedTimestamp gets last request failed timestamp. func (rpc *RPCStatus) GetLastRequestFailedTimestamp() int64 { return atomic.LoadInt64(&rpc.lastRequestFailedTimestamp) } -// GetSuccessiveRequestFailureCount ... +// GetSuccessiveRequestFailureCount gets successive request failure count. func (rpc *RPCStatus) GetSuccessiveRequestFailureCount() int32 { return atomic.LoadInt32(&rpc.successiveRequestFailureCount) } -// GetURLStatus ... +// GetURLStatus get URL RPC status. func GetURLStatus(url common.URL) *RPCStatus { rpcStatus, _ := serviceStatistic.LoadOrStore(url.Key(), &RPCStatus{}) return rpcStatus.(*RPCStatus) } -// GetMethodStatus ... +// GetMethodStatus get method RPC status. func GetMethodStatus(url common.URL, methodName string) *RPCStatus { identifier := url.Key() methodMap, found := methodStatistics.Load(identifier) @@ -122,13 +122,13 @@ func GetMethodStatus(url common.URL, methodName string) *RPCStatus { return status } -// BeginCount ... +// BeginCount gets begin count. func BeginCount(url common.URL, methodName string) { beginCount0(GetURLStatus(url)) beginCount0(GetMethodStatus(url, methodName)) } -// EndCount ... +// EndCount gets end count. func EndCount(url common.URL, methodName string, elapsed int64, succeeded bool) { endCount0(GetURLStatus(url), elapsed, succeeded) endCount0(GetMethodStatus(url, methodName), elapsed, succeeded) @@ -163,7 +163,7 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) { } } -// CurrentTimeMillis ... +// CurrentTimeMillis get current timestamp func CurrentTimeMillis() int64 { return time.Now().UnixNano() / int64(time.Millisecond) }