From 5fc95d16614d382214a86f89a57ce6622c3adf2d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 9 Nov 2020 17:57:59 +0100 Subject: [PATCH 1/3] WIP Websocket protocol binding Signed-off-by: Francesco Guardiani --- protocol/ws/v2/client_protocol.go | 96 ++++++++++++++ protocol/ws/v2/context.go | 15 +++ protocol/ws/v2/doc.go | 4 + protocol/ws/v2/go.mod | 10 ++ protocol/ws/v2/go.sum | 108 ++++++++++++++++ protocol/ws/v2/server_protocol.go | 7 ++ protocol/ws/v2/subprotocols.go | 22 ++++ protocol/ws/v2/write_message.go | 41 ++++++ samples/README.md | 4 +- samples/ws/client/main.go | 65 ++++++++++ samples/ws/go.mod | 13 ++ samples/ws/go.sum | 153 +++++++++++++++++++++++ v2/binding/generic_structured_message.go | 37 ++++++ 13 files changed, 574 insertions(+), 1 deletion(-) create mode 100644 protocol/ws/v2/client_protocol.go create mode 100644 protocol/ws/v2/context.go create mode 100644 protocol/ws/v2/doc.go create mode 100644 protocol/ws/v2/go.mod create mode 100644 protocol/ws/v2/go.sum create mode 100644 protocol/ws/v2/server_protocol.go create mode 100644 protocol/ws/v2/subprotocols.go create mode 100644 protocol/ws/v2/write_message.go create mode 100644 samples/ws/client/main.go create mode 100644 samples/ws/go.mod create mode 100644 samples/ws/go.sum create mode 100644 v2/binding/generic_structured_message.go diff --git a/protocol/ws/v2/client_protocol.go b/protocol/ws/v2/client_protocol.go new file mode 100644 index 000000000..9bd91ec45 --- /dev/null +++ b/protocol/ws/v2/client_protocol.go @@ -0,0 +1,96 @@ +package v2 + +import ( + "context" + "fmt" + "io" + "io/ioutil" + + "nhooyr.io/websocket" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +type ClientProtocol struct { + conn *websocket.Conn + + format format.Format + messageType websocket.MessageType +} + +// Dial wraps websocket.Dial and creates the ClientProtocol. +func Dial(ctx context.Context, u string, opts *websocket.DialOptions) (*ClientProtocol, error) { + if opts == nil { + opts = &websocket.DialOptions{} + } + opts.Subprotocols = SupportedSubprotocols + c, _, err := websocket.Dial(ctx, u, opts) + if err != nil { + return nil, err + } + return NewClientProtocol(c) +} + +// NewClientProtocol wraps a websocket.Conn in a type that implements protocol.Receiver, protocol.Sender and protocol.Closer. +func NewClientProtocol(c *websocket.Conn) (*ClientProtocol, error) { + format, messageType, err := resolveFormat(c.Subprotocol()) + if err != nil { + return nil, err + } + return &ClientProtocol{ + conn: c, + format: format, + messageType: messageType, + }, nil +} + +func (c ClientProtocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error { + writer, err := c.conn.Writer(ctx, c.messageType) + if err != nil { + return err + } + return WriteWriter(ctx, m, writer, transformers...) +} + +func (c ClientProtocol) Receive(ctx context.Context) (binding.Message, error) { + messageType, reader, err := c.conn.Reader(ctx) + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, err + } + + if messageType != c.messageType { + // We need to consume the stream, otherwise it won't be possible to consume the stream + consumeStream(reader) + return nil, fmt.Errorf("wrong message type: %s, expected %s", messageType, c.messageType) + } + + return binding.NewStructuredMessage(c.format, reader), nil +} + +func consumeStream(reader io.Reader) { + //TODO is there a less expensive way to consume the stream? + ioutil.ReadAll(reader) +} + +func (c ClientProtocol) Close(ctx context.Context) error { + statusCode := websocket.StatusNormalClosure + if val := ctx.Value(codeKey{}); val != nil { + statusCode = val.(websocket.StatusCode) + } + + reason := "" + if val := ctx.Value(reasonKey{}); val != nil { + reason = val.(string) + } + + return c.conn.Close(statusCode, reason) +} + +var _ protocol.Receiver = ClientProtocol{} +var _ protocol.Sender = ClientProtocol{} +var _ protocol.Closer = ClientProtocol{} diff --git a/protocol/ws/v2/context.go b/protocol/ws/v2/context.go new file mode 100644 index 000000000..f47ab3b8c --- /dev/null +++ b/protocol/ws/v2/context.go @@ -0,0 +1,15 @@ +package v2 + +import ( + "context" + + "nhooyr.io/websocket" +) + +type codeKey struct{} + +type reasonKey struct{} + +func WithCloseReason(ctx context.Context, code websocket.StatusCode, reason string) context.Context { + return context.WithValue(context.WithValue(ctx, codeKey{}, code), reasonKey{}, reason) +} diff --git a/protocol/ws/v2/doc.go b/protocol/ws/v2/doc.go new file mode 100644 index 000000000..5a81024a6 --- /dev/null +++ b/protocol/ws/v2/doc.go @@ -0,0 +1,4 @@ +/* +Package ws implements the Websocket protocol binding +*/ +package v2 diff --git a/protocol/ws/v2/go.mod b/protocol/ws/v2/go.mod new file mode 100644 index 000000000..f8cdb2ef4 --- /dev/null +++ b/protocol/ws/v2/go.mod @@ -0,0 +1,10 @@ +module github.com/cloudevents/sdk-go/protocol/ws/v2 + +go 1.14 + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +require ( + github.com/cloudevents/sdk-go/v2 v2.0.0 + nhooyr.io/websocket v1.8.6 // indirect +) diff --git a/protocol/ws/v2/go.sum b/protocol/ws/v2/go.sum new file mode 100644 index 000000000..2f026408f --- /dev/null +++ b/protocol/ws/v2/go.sum @@ -0,0 +1,108 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= +go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= +nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/protocol/ws/v2/server_protocol.go b/protocol/ws/v2/server_protocol.go new file mode 100644 index 000000000..92a20800b --- /dev/null +++ b/protocol/ws/v2/server_protocol.go @@ -0,0 +1,7 @@ +package v2 + +import "nhooyr.io/websocket" + +type serverProtocol struct { + conn *websocket.Conn +} diff --git a/protocol/ws/v2/subprotocols.go b/protocol/ws/v2/subprotocols.go new file mode 100644 index 000000000..cb7d7af3f --- /dev/null +++ b/protocol/ws/v2/subprotocols.go @@ -0,0 +1,22 @@ +package v2 + +import ( + "fmt" + + "nhooyr.io/websocket" + + "github.com/cloudevents/sdk-go/v2/binding/format" +) + +const JsonSubprotocol = "cloudevents.json" + +var SupportedSubprotocols = []string{JsonSubprotocol} + +func resolveFormat(subprotocol string) (format.Format, websocket.MessageType, error) { + switch subprotocol { + case "cloudevents.json": + return format.JSON, websocket.MessageText, nil + default: + return nil, websocket.MessageText, fmt.Errorf("subprotocol not supported: %s", subprotocol) + } +} diff --git a/protocol/ws/v2/write_message.go b/protocol/ws/v2/write_message.go new file mode 100644 index 000000000..f3b8f55bf --- /dev/null +++ b/protocol/ws/v2/write_message.go @@ -0,0 +1,41 @@ +package v2 + +import ( + "context" + "io" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" +) + +// WriteWriter fills the provided writer with the bindings.Message m. +// Using context you can tweak the encoding processing (more details on binding.Write documentation). +func WriteWriter(ctx context.Context, m binding.Message, writer io.WriteCloser, transformers ...binding.Transformer) error { + structuredWriter := &wsMessageWriter{writer} + + _, err := binding.Write( + ctx, + m, + structuredWriter, + nil, + transformers..., + ) + return err +} + +type wsMessageWriter struct { + io.WriteCloser +} + +func (w *wsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Format, event io.Reader) error { + _, err := io.Copy(w.WriteCloser, event) + if err != nil { + // Try to close anyway + _ = w.WriteCloser.Close() + return err + } + + return w.WriteCloser.Close() +} + +var _ binding.StructuredWriter = (*wsMessageWriter)(nil) // Test it conforms to the interface diff --git a/samples/README.md b/samples/README.md index 33d0d47c5..dba78e141 100644 --- a/samples/README.md +++ b/samples/README.md @@ -35,4 +35,6 @@ You can grab them and copy-paste in your project to start using sdk-go. * STAN * [Receiver](./stan/receiver): Receive events using the CloudEvents Client. * [Sender](./stan/sender): Receive events using the CloudEvents Client. - * [Sender & Receiver](./stan/sender-receiver): Send and receive events using the same NATS client. \ No newline at end of file + * [Sender & Receiver](./stan/sender-receiver): Send and receive events using the same NATS client. +* WebSockets + * [Client](./ws/client): Sends and receive events, from client side, using the CloudEvents Client. \ No newline at end of file diff --git a/samples/ws/client/main.go b/samples/ws/client/main.go new file mode 100644 index 000000000..eef4859ec --- /dev/null +++ b/samples/ws/client/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "log" + "sync" + "sync/atomic" + + cews "github.com/cloudevents/sdk-go/protocol/ws/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func main() { + ctx := context.Background() + s, err := cews.Dial(ctx, "http://localhost:8080", nil) + if err != nil { + log.Fatalf("failed to dial: %v", err) + } + + defer s.Close(ctx) + + c, err := cloudevents.NewClient(s, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Fatalf("failed to create client: %v", err) + } + + var wg sync.WaitGroup + wg.Add(2) + go func() { + for i := 0; i < 10; i++ { + e := cloudevents.NewEvent() + e.SetType("com.cloudevents.sample.sent") + e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/stan/sender") + _ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "id": i, + "message": "Hello, World!", + }) + + if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) { + log.Printf("Failed to send: %v", err) + } else { + log.Printf("Sent: %d, accepted: %t", i, cloudevents.IsACK(result)) + } + } + wg.Done() + }() + go func() { + received := uint32(0) + ctx, cancel := context.WithCancel(ctx) + err := c.StartReceiver(ctx, func(event cloudevents.Event) { + log.Printf("Received event:\n%v", event) + if atomic.AddUint32(&received, 1) == 10 { + cancel() + } + }) + if err != nil { + log.Printf("failed to start receiver: %v", err) + } else { + <-ctx.Done() + } + wg.Done() + }() + + wg.Wait() +} diff --git a/samples/ws/go.mod b/samples/ws/go.mod new file mode 100644 index 000000000..be2f87656 --- /dev/null +++ b/samples/ws/go.mod @@ -0,0 +1,13 @@ +module github.com/cloudevents/sdk-go/samples/ws + +go 1.14 + +replace ( + github.com/cloudevents/sdk-go/protocol/ws/v2 => ../../protocol/ws/v2 + github.com/cloudevents/sdk-go/v2 => ../../v2 +) + +require ( + github.com/cloudevents/sdk-go/protocol/ws/v2 v2.0.0 + github.com/cloudevents/sdk-go/v2 v2.0.0 +) diff --git a/samples/ws/go.sum b/samples/ws/go.sum new file mode 100644 index 000000000..45975057e --- /dev/null +++ b/samples/ws/go.sum @@ -0,0 +1,153 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= +github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac h1:+2b6iGRJe3hvV/yVXrd41yVEjxuFHxasJqDhkIjS4gk= +github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.2 h1:uqH7bpe+ERSiDa34FDOF7RikN6RzXgduUF8yarlZp94= +github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= +go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= +nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/v2/binding/generic_structured_message.go b/v2/binding/generic_structured_message.go new file mode 100644 index 000000000..6c2136b96 --- /dev/null +++ b/v2/binding/generic_structured_message.go @@ -0,0 +1,37 @@ +package binding + +import ( + "context" + "io" + + "github.com/cloudevents/sdk-go/v2/binding/format" +) + +type genericStructuredMessage struct { + format format.Format + reader io.Reader +} + +// NewStructuredMessage wraps a format and an io.Reader returning an implementation of Message +// This message *cannot* be read several times safely +func NewStructuredMessage(format format.Format, reader io.Reader) *genericStructuredMessage { + return &genericStructuredMessage{reader: reader, format: format} +} + +var _ Message = (*genericStructuredMessage)(nil) + +func (m *genericStructuredMessage) ReadEncoding() Encoding { + return EncodingStructured +} + +func (m *genericStructuredMessage) ReadStructured(ctx context.Context, encoder StructuredWriter) error { + return encoder.SetStructuredEvent(ctx, m.format, m.reader) +} + +func (m *genericStructuredMessage) ReadBinary(ctx context.Context, encoder BinaryWriter) error { + return ErrNotBinary +} + +func (m *genericStructuredMessage) Finish(err error) error { + return nil +} From dce71bdea7c4568b9d8d4941070bacab8dbb0834 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 10 Nov 2020 11:40:35 +0100 Subject: [PATCH 2/3] Tests implemented Cleanup some stuff Signed-off-by: Francesco Guardiani --- protocol/ws/v2/client_protocol.go | 77 +++++++++---- protocol/ws/v2/client_protocol_test.go | 101 ++++++++++++++++++ protocol/ws/v2/go.mod | 3 +- protocol/ws/v2/go.sum | 11 ++ protocol/ws/v2/server_protocol.go | 7 -- protocol/ws/v2/subprotocols_test.go | 37 +++++++ protocol/ws/v2/write_message.go | 41 ------- samples/ws/client/main.go | 6 +- .../structured_message.go} | 22 ++-- v2/binding/utils/structured_message_test.go | 29 +++++ v2/binding/utils/write_structured_message.go | 46 ++++++++ .../utils/write_structured_message_test.go | 68 ++++++++++++ v2/protocol/inbound.go | 4 +- 13 files changed, 369 insertions(+), 83 deletions(-) create mode 100644 protocol/ws/v2/client_protocol_test.go delete mode 100644 protocol/ws/v2/server_protocol.go create mode 100644 protocol/ws/v2/subprotocols_test.go delete mode 100644 protocol/ws/v2/write_message.go rename v2/binding/{generic_structured_message.go => utils/structured_message.go} (61%) create mode 100644 v2/binding/utils/structured_message_test.go create mode 100644 v2/binding/utils/write_structured_message.go create mode 100644 v2/binding/utils/write_structured_message_test.go diff --git a/protocol/ws/v2/client_protocol.go b/protocol/ws/v2/client_protocol.go index 9bd91ec45..cd1b6683b 100644 --- a/protocol/ws/v2/client_protocol.go +++ b/protocol/ws/v2/client_protocol.go @@ -2,22 +2,32 @@ package v2 import ( "context" + "errors" "fmt" "io" "io/ioutil" + "sync" "nhooyr.io/websocket" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/utils" "github.com/cloudevents/sdk-go/v2/protocol" ) +// ClientProtocol implements protocol.Receiver, protocol.Sender and protocol.Closer. +// Note: when you use client.StartReceiver with this protocol, you can use just one +// goroutine to poll this protocol, because the protocol itself cannot handle multiple +// received messages at same time (WS has no multiplexing!) type ClientProtocol struct { conn *websocket.Conn format format.Format messageType websocket.MessageType + + receiverLock sync.Mutex + connOwned bool // whether this protocol created the connection } // Dial wraps websocket.Dial and creates the ClientProtocol. @@ -30,33 +40,55 @@ func Dial(ctx context.Context, u string, opts *websocket.DialOptions) (*ClientPr if err != nil { return nil, err } - return NewClientProtocol(c) + p, err := NewClientProtocol(c) + if err != nil { + return nil, err + } + p.connOwned = true + return p, nil } // NewClientProtocol wraps a websocket.Conn in a type that implements protocol.Receiver, protocol.Sender and protocol.Closer. +// Look at ClientProtocol for more details. func NewClientProtocol(c *websocket.Conn) (*ClientProtocol, error) { - format, messageType, err := resolveFormat(c.Subprotocol()) + f, messageType, err := resolveFormat(c.Subprotocol()) if err != nil { return nil, err } return &ClientProtocol{ conn: c, - format: format, + format: f, messageType: messageType, + connOwned: false, }, nil } -func (c ClientProtocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error { +func (c *ClientProtocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error { writer, err := c.conn.Writer(ctx, c.messageType) if err != nil { return err } - return WriteWriter(ctx, m, writer, transformers...) + return utils.WriteStructured(ctx, m, writer, transformers...) +} + +func (c *ClientProtocol) Receive(ctx context.Context) (binding.Message, error) { + c.receiverLock.Lock() + m, err := c.UnsafeReceive(ctx) + if m != nil { + m = binding.WithFinish(m, func(err error) { + c.receiverLock.Unlock() + }) + } else { + c.receiverLock.Unlock() + } + return m, err } -func (c ClientProtocol) Receive(ctx context.Context) (binding.Message, error) { +// UnsafeReceive is like Receive, except it doesn't guard from multiple invocations +// from different goroutines. +func (c *ClientProtocol) UnsafeReceive(ctx context.Context) (binding.Message, error) { messageType, reader, err := c.conn.Reader(ctx) - if err == io.EOF { + if errors.Is(err, io.EOF) || errors.Is(err, websocket.CloseError{}) || (ctx.Err() != nil && errors.Is(err, ctx.Err())) { return nil, io.EOF } if err != nil { @@ -69,7 +101,7 @@ func (c ClientProtocol) Receive(ctx context.Context) (binding.Message, error) { return nil, fmt.Errorf("wrong message type: %s, expected %s", messageType, c.messageType) } - return binding.NewStructuredMessage(c.format, reader), nil + return utils.NewStructuredMessage(c.format, reader), nil } func consumeStream(reader io.Reader) { @@ -77,20 +109,23 @@ func consumeStream(reader io.Reader) { ioutil.ReadAll(reader) } -func (c ClientProtocol) Close(ctx context.Context) error { - statusCode := websocket.StatusNormalClosure - if val := ctx.Value(codeKey{}); val != nil { - statusCode = val.(websocket.StatusCode) - } +func (c *ClientProtocol) Close(ctx context.Context) error { + if c.connOwned { + statusCode := websocket.StatusNormalClosure + if val := ctx.Value(codeKey{}); val != nil { + statusCode = val.(websocket.StatusCode) + } - reason := "" - if val := ctx.Value(reasonKey{}); val != nil { - reason = val.(string) - } + reason := "" + if val := ctx.Value(reasonKey{}); val != nil { + reason = val.(string) + } - return c.conn.Close(statusCode, reason) + return c.conn.Close(statusCode, reason) + } + return nil } -var _ protocol.Receiver = ClientProtocol{} -var _ protocol.Sender = ClientProtocol{} -var _ protocol.Closer = ClientProtocol{} +var _ protocol.Receiver = (*ClientProtocol)(nil) +var _ protocol.Sender = (*ClientProtocol)(nil) +var _ protocol.Closer = (*ClientProtocol)(nil) diff --git a/protocol/ws/v2/client_protocol_test.go b/protocol/ws/v2/client_protocol_test.go new file mode 100644 index 000000000..19bc9d92d --- /dev/null +++ b/protocol/ws/v2/client_protocol_test.go @@ -0,0 +1,101 @@ +package v2 + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "nhooyr.io/websocket" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/client" + . "github.com/cloudevents/sdk-go/v2/test" +) + +func pingEvent() cloudevents.Event { + event := cloudevents.NewEvent() + event.SetID("1") + event.SetType("ping") + event.SetSource("localhost") + return event +} + +func TestClientProtocolPingPong(t *testing.T) { + server := httptest.NewServer(PingPongHandler(t)) + defer server.Close() + + p, err := Dial(context.TODO(), server.URL, nil) + require.NoError(t, err) + + ping := pingEvent() + require.NoError(t, p.Send(context.TODO(), binding.ToMessage(&ping))) + + receivedMessage, err := p.Receive(context.TODO()) + require.NoError(t, err) + + pong, err := binding.ToEvent(context.TODO(), receivedMessage) + require.NoError(t, err) + + AssertEvent(t, *pong, HasId("2"), HasType("pong")) + + require.NoError(t, p.Close(context.TODO())) +} + +func TestClientProtocolPingPongWithClient(t *testing.T) { + server := httptest.NewServer(PingPongHandler(t)) + defer server.Close() + + p, err := Dial(context.TODO(), server.URL, nil) + require.NoError(t, err) + + c, err := cloudevents.NewClient(p, client.WithPollGoroutines(1)) + require.NoError(t, err) + + ping := pingEvent() + require.NoError(t, c.Send(context.TODO(), ping)) + + ctx, cancelFn := context.WithCancel(context.TODO()) + var received atomic.Value + // Start receiver closes the connection when stopped! + err = c.StartReceiver(ctx, func(event cloudevents.Event) { + received.Store(event) + cancelFn() + }) + require.NoError(t, err) + + pong, ok := received.Load().(cloudevents.Event) + require.True(t, ok) + + AssertEvent(t, pong, HasId("2"), HasType("pong")) +} + +func PingPongHandler(t *testing.T) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + c, err := websocket.Accept(writer, request, &websocket.AcceptOptions{Subprotocols: SupportedSubprotocols}) + + require.NoError(t, err) + require.Equal(t, JsonSubprotocol, c.Subprotocol()) + + messageType, b, err := c.Read(context.TODO()) + require.NoError(t, err) + require.Equal(t, websocket.MessageText, messageType) + + var ping cloudevents.Event + require.NoError(t, json.Unmarshal(b, &ping)) + AssertEvent(t, ping, HasId("1"), HasType("ping")) + + pong := ping.Clone() + pong.SetID("2") + pong.SetType("pong") + + require.NoError(t, c.Write(context.TODO(), websocket.MessageText, MustJSON(t, pong))) + + ctx := c.CloseRead(context.TODO()) + <-ctx.Done() + } +} diff --git a/protocol/ws/v2/go.mod b/protocol/ws/v2/go.mod index f8cdb2ef4..d612615ba 100644 --- a/protocol/ws/v2/go.mod +++ b/protocol/ws/v2/go.mod @@ -6,5 +6,6 @@ replace github.com/cloudevents/sdk-go/v2 => ../../../v2 require ( github.com/cloudevents/sdk-go/v2 v2.0.0 - nhooyr.io/websocket v1.8.6 // indirect + github.com/stretchr/testify v1.5.1 + nhooyr.io/websocket v1.8.6 ) diff --git a/protocol/ws/v2/go.sum b/protocol/ws/v2/go.sum index 2f026408f..384de3c0d 100644 --- a/protocol/ws/v2/go.sum +++ b/protocol/ws/v2/go.sum @@ -3,6 +3,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= @@ -22,11 +23,14 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -36,6 +40,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac h1:+2b6iGRJe3hvV/yVXrd41yVEjxuFHxasJqDhkIjS4gk= github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -45,18 +50,23 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -102,6 +112,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= diff --git a/protocol/ws/v2/server_protocol.go b/protocol/ws/v2/server_protocol.go deleted file mode 100644 index 92a20800b..000000000 --- a/protocol/ws/v2/server_protocol.go +++ /dev/null @@ -1,7 +0,0 @@ -package v2 - -import "nhooyr.io/websocket" - -type serverProtocol struct { - conn *websocket.Conn -} diff --git a/protocol/ws/v2/subprotocols_test.go b/protocol/ws/v2/subprotocols_test.go new file mode 100644 index 000000000..f7b288ab4 --- /dev/null +++ b/protocol/ws/v2/subprotocols_test.go @@ -0,0 +1,37 @@ +package v2 + +import ( + "testing" + + "github.com/stretchr/testify/require" + "nhooyr.io/websocket" + + "github.com/cloudevents/sdk-go/v2/binding/format" +) + +func Test_resolveFormat(t *testing.T) { + tests := []struct { + name string + subprotocol string + wantFormat format.Format + wantMessageType websocket.MessageType + }{{ + name: "JSON subprotocol", + subprotocol: JsonSubprotocol, + wantFormat: format.JSON, + wantMessageType: websocket.MessageText, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fmt, messageType, err := resolveFormat(tt.subprotocol) + require.NoError(t, err) + require.Equal(t, tt.wantFormat, fmt) + require.Equal(t, tt.wantMessageType, messageType) + }) + } +} + +func Test_resolveFormat_error(t *testing.T) { + _, _, err := resolveFormat("lalala") + require.Error(t, err, "subprotocol not supported: lalala") +} diff --git a/protocol/ws/v2/write_message.go b/protocol/ws/v2/write_message.go deleted file mode 100644 index f3b8f55bf..000000000 --- a/protocol/ws/v2/write_message.go +++ /dev/null @@ -1,41 +0,0 @@ -package v2 - -import ( - "context" - "io" - - "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/binding/format" -) - -// WriteWriter fills the provided writer with the bindings.Message m. -// Using context you can tweak the encoding processing (more details on binding.Write documentation). -func WriteWriter(ctx context.Context, m binding.Message, writer io.WriteCloser, transformers ...binding.Transformer) error { - structuredWriter := &wsMessageWriter{writer} - - _, err := binding.Write( - ctx, - m, - structuredWriter, - nil, - transformers..., - ) - return err -} - -type wsMessageWriter struct { - io.WriteCloser -} - -func (w *wsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Format, event io.Reader) error { - _, err := io.Copy(w.WriteCloser, event) - if err != nil { - // Try to close anyway - _ = w.WriteCloser.Close() - return err - } - - return w.WriteCloser.Close() -} - -var _ binding.StructuredWriter = (*wsMessageWriter)(nil) // Test it conforms to the interface diff --git a/samples/ws/client/main.go b/samples/ws/client/main.go index eef4859ec..575aa6169 100644 --- a/samples/ws/client/main.go +++ b/samples/ws/client/main.go @@ -12,14 +12,12 @@ import ( func main() { ctx := context.Background() - s, err := cews.Dial(ctx, "http://localhost:8080", nil) + p, err := cews.Dial(ctx, "http://localhost:8080", nil) if err != nil { log.Fatalf("failed to dial: %v", err) } - defer s.Close(ctx) - - c, err := cloudevents.NewClient(s, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { log.Fatalf("failed to create client: %v", err) } diff --git a/v2/binding/generic_structured_message.go b/v2/binding/utils/structured_message.go similarity index 61% rename from v2/binding/generic_structured_message.go rename to v2/binding/utils/structured_message.go index 6c2136b96..e62c8ed93 100644 --- a/v2/binding/generic_structured_message.go +++ b/v2/binding/utils/structured_message.go @@ -1,9 +1,10 @@ -package binding +package utils import ( "context" "io" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" ) @@ -18,20 +19,25 @@ func NewStructuredMessage(format format.Format, reader io.Reader) *genericStruct return &genericStructuredMessage{reader: reader, format: format} } -var _ Message = (*genericStructuredMessage)(nil) +var _ binding.Message = (*genericStructuredMessage)(nil) -func (m *genericStructuredMessage) ReadEncoding() Encoding { - return EncodingStructured +func (m *genericStructuredMessage) ReadEncoding() binding.Encoding { + return binding.EncodingStructured } -func (m *genericStructuredMessage) ReadStructured(ctx context.Context, encoder StructuredWriter) error { +func (m *genericStructuredMessage) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { return encoder.SetStructuredEvent(ctx, m.format, m.reader) } -func (m *genericStructuredMessage) ReadBinary(ctx context.Context, encoder BinaryWriter) error { - return ErrNotBinary +func (m *genericStructuredMessage) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { + return binding.ErrNotBinary } func (m *genericStructuredMessage) Finish(err error) error { - return nil + if closer, ok := m.reader.(io.ReadCloser); ok { + if err2 := closer.Close(); err2 != nil { + return err2 + } + } + return err } diff --git a/v2/binding/utils/structured_message_test.go b/v2/binding/utils/structured_message_test.go new file mode 100644 index 000000000..f2cba2b91 --- /dev/null +++ b/v2/binding/utils/structured_message_test.go @@ -0,0 +1,29 @@ +package utils_test + +import ( + "bytes" + "context" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/utils" + "github.com/cloudevents/sdk-go/v2/test" +) + +func TestNewStructuredMessage(t *testing.T) { + testEvent := test.ConvertEventExtensionsToString(t, test.FullEvent()) + jsonBytes := test.MustJSON(t, testEvent) + + message := utils.NewStructuredMessage(format.JSON, ioutil.NopCloser(bytes.NewReader(jsonBytes))) + + require.Equal(t, binding.EncodingStructured, message.ReadEncoding()) + + event := test.MustToEvent(t, context.TODO(), message) + test.AssertEventEquals(t, testEvent, event) + + require.NoError(t, message.Finish(nil)) +} diff --git a/v2/binding/utils/write_structured_message.go b/v2/binding/utils/write_structured_message.go new file mode 100644 index 000000000..c3d5f2b26 --- /dev/null +++ b/v2/binding/utils/write_structured_message.go @@ -0,0 +1,46 @@ +package utils + +import ( + "context" + "io" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" +) + +// WriteStructured fills the provided io.Writer with the binding.Message m in structured mode. +// Using context you can tweak the encoding processing (more details on binding.Write documentation). +func WriteStructured(ctx context.Context, m binding.Message, writer io.Writer, transformers ...binding.Transformer) error { + _, err := binding.Write( + ctx, + m, + wsMessageWriter{writer}, + nil, + transformers..., + ) + return err +} + +type wsMessageWriter struct { + io.Writer +} + +func (w wsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Format, event io.Reader) error { + _, err := io.Copy(w.Writer, event) + if err != nil { + // Try to close anyway + _ = w.tryToClose() + return err + } + + return w.tryToClose() +} + +func (w wsMessageWriter) tryToClose() error { + if closer, ok := w.Writer.(io.WriteCloser); ok { + return closer.Close() + } + return nil +} + +var _ binding.StructuredWriter = wsMessageWriter{} // Test it conforms to the interface diff --git a/v2/binding/utils/write_structured_message_test.go b/v2/binding/utils/write_structured_message_test.go new file mode 100644 index 000000000..57df5677f --- /dev/null +++ b/v2/binding/utils/write_structured_message_test.go @@ -0,0 +1,68 @@ +package utils_test + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/utils" + "github.com/cloudevents/sdk-go/v2/test" +) + +func TestWriteStructured(t *testing.T) { + testEvent := test.ConvertEventExtensionsToString(t, test.FullEvent()) + testMessage := binding.ToMessage(&testEvent) + + var buffer bytes.Buffer + err := utils.WriteStructured(context.TODO(), testMessage, &buffer) + require.NoError(t, err) + + haveEvent := cloudevents.Event{} + require.NoError(t, json.Unmarshal(buffer.Bytes(), &haveEvent)) + test.AssertEventEquals(t, testEvent, haveEvent) +} + +func TestPipeStructured(t *testing.T) { + testEvent := test.ConvertEventExtensionsToString(t, test.FullEvent()) + jsonBytes := test.MustJSON(t, testEvent) + + message := utils.NewStructuredMessage(format.JSON, bytes.NewReader(jsonBytes)) + defer message.Finish(nil) + + var buffer bytes.Buffer + err := utils.WriteStructured(context.TODO(), message, &buffer) + require.NoError(t, err) + + haveEvent := cloudevents.Event{} + require.NoError(t, json.Unmarshal(buffer.Bytes(), &haveEvent)) + test.AssertEventEquals(t, testEvent, haveEvent) +} + +func TestWriteStructuredWithWriteCloser(t *testing.T) { + wantErr := errors.New("writer mock error") + + testEvent := test.ConvertEventExtensionsToString(t, test.FullEvent()) + testMessage := binding.ToMessage(&testEvent) + + haveErr := utils.WriteStructured(context.TODO(), testMessage, writeCloserMock{wantErr}) + require.Equal(t, wantErr, haveErr) +} + +type writeCloserMock struct { + error +} + +func (w writeCloserMock) Write(p []byte) (n int, err error) { + return len(p), nil +} + +func (w writeCloserMock) Close() error { + return w.error +} diff --git a/v2/protocol/inbound.go b/v2/protocol/inbound.go index e67ed8acd..39181771f 100644 --- a/v2/protocol/inbound.go +++ b/v2/protocol/inbound.go @@ -9,6 +9,7 @@ import ( // Receiver receives messages. type Receiver interface { // Receive blocks till a message is received or ctx expires. + // Receive can be invoked safely from different goroutines. // // A non-nil error means the receiver is closed. // io.EOF means it closed cleanly, any other value indicates an error. @@ -29,7 +30,8 @@ type ResponseFn func(ctx context.Context, m binding.Message, r Result, transform // Responder receives messages and is given a callback to respond. type Responder interface { - // Receive blocks till a message is received or ctx expires. + // Respond blocks till a message is received or ctx expires. + // Respond can be invoked safely from different goroutines. // // A non-nil error means the receiver is closed. // io.EOF means it closed cleanly, any other value indicates an error. From c351351e484186123f618de645052ab8e78d22d2 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 13 Nov 2020 09:31:08 +0100 Subject: [PATCH 3/3] Suggestions Signed-off-by: Francesco Guardiani --- protocol/ws/v2/client_protocol_test.go | 6 +++--- protocol/ws/v2/subprotocols_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/protocol/ws/v2/client_protocol_test.go b/protocol/ws/v2/client_protocol_test.go index 19bc9d92d..7c45b3f20 100644 --- a/protocol/ws/v2/client_protocol_test.go +++ b/protocol/ws/v2/client_protocol_test.go @@ -26,7 +26,7 @@ func pingEvent() cloudevents.Event { } func TestClientProtocolPingPong(t *testing.T) { - server := httptest.NewServer(PingPongHandler(t)) + server := httptest.NewServer(pingPongHandler(t)) defer server.Close() p, err := Dial(context.TODO(), server.URL, nil) @@ -47,7 +47,7 @@ func TestClientProtocolPingPong(t *testing.T) { } func TestClientProtocolPingPongWithClient(t *testing.T) { - server := httptest.NewServer(PingPongHandler(t)) + server := httptest.NewServer(pingPongHandler(t)) defer server.Close() p, err := Dial(context.TODO(), server.URL, nil) @@ -74,7 +74,7 @@ func TestClientProtocolPingPongWithClient(t *testing.T) { AssertEvent(t, pong, HasId("2"), HasType("pong")) } -func PingPongHandler(t *testing.T) http.HandlerFunc { +func pingPongHandler(t *testing.T) http.HandlerFunc { return func(writer http.ResponseWriter, request *http.Request) { c, err := websocket.Accept(writer, request, &websocket.AcceptOptions{Subprotocols: SupportedSubprotocols}) diff --git a/protocol/ws/v2/subprotocols_test.go b/protocol/ws/v2/subprotocols_test.go index f7b288ab4..27040f0e4 100644 --- a/protocol/ws/v2/subprotocols_test.go +++ b/protocol/ws/v2/subprotocols_test.go @@ -9,7 +9,7 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/format" ) -func Test_resolveFormat(t *testing.T) { +func TestResolveFormat(t *testing.T) { tests := []struct { name string subprotocol string @@ -31,7 +31,7 @@ func Test_resolveFormat(t *testing.T) { } } -func Test_resolveFormat_error(t *testing.T) { +func TestResolveFormatError(t *testing.T) { _, _, err := resolveFormat("lalala") require.Error(t, err, "subprotocol not supported: lalala") }