Skip to content

Commit

Permalink
Fix grpc reconnect logic (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley authored Jul 24, 2023
1 parent 88ad4b3 commit 236d2be
Show file tree
Hide file tree
Showing 22 changed files with 306 additions and 97 deletions.
17 changes: 1 addition & 16 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,6 @@ github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
Expand Down Expand Up @@ -1210,7 +1209,6 @@ github.com/kisielk/errcheck v1.5.0 h1:e8esj/e4R+SAOwFwN+n3zr0nYeCyeweozKfO23MvHz
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
Expand Down Expand Up @@ -1301,7 +1299,6 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d h1:CdDQnGF8Nq9ocOS/xlSptM1N3BbrA6/kmaep5ggwaIA=
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d/go.mod h1:3OzsM7FXDQlpCiw2j81fOmAwQLnZnLGXVKUzeKQXIAw=
Expand All @@ -1319,6 +1316,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
Expand Down Expand Up @@ -1379,12 +1377,10 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.14.0/go.mod h1:WT//axPky3FdvXHzGw33dNdXXXfFQqmEalje+egj8As=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 h1:lIOOHPEbXzO3vnmx2gok1Tfs31Q8GQqKLc8vVqyQq/I=
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
Expand Down Expand Up @@ -1512,7 +1508,6 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/exp v0.0.0-20221211140036-ad323defaf05 h1:T8EldfGCcveFMewH5xAYxxoX3PSQMrsechlUGVFlQBU=
golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 h1:BEABXpNXLEz0WxtA+6CQIz2xkg80e+1zrhWyMcq8VzE=
Expand All @@ -1526,7 +1521,6 @@ golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhp
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -1541,7 +1535,6 @@ golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfS
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
Expand All @@ -1558,7 +1551,6 @@ golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g=
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -1578,21 +1570,17 @@ golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
Expand Down Expand Up @@ -1633,7 +1621,6 @@ google.golang.org/genproto v0.0.0-20230303212802-e74f57abe488/go.mod h1:TvhZT5f7
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/genproto v0.0.0-20230320184635-7606e756e683/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/genproto v0.0.0-20230323212658-478b75c54725/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/genproto v0.0.0-20230525234025-438c736192d0/go.mod h1:9ExIQyXL5hZrHzQceCwuSYwZZ5QZBazOcprJ5rgs3lY=
google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk=
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64=
Expand Down Expand Up @@ -1661,12 +1648,10 @@ google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsA
google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/dancannon/gorethink.v3 v3.0.5 h1:/g7PWP7zUS6vSNmHSDbjCHQh1Rqn8Jy6zSMQxAsBSMQ=
Expand Down
49 changes: 44 additions & 5 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"encoding/json"
"fmt"
"io"
"strconv"
"sync"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -148,8 +147,17 @@ func (c *commander) Send(ctx context.Context, message Message) error {
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

isRetrying := false
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}
if err := c.channel.Send(cmd); err != nil {
isRetrying = true
return c.handleGrpcError("Commander Channel Send", err)
}

Expand All @@ -168,22 +176,32 @@ func (c *commander) Recv() <-chan Message {
func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) {
log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId())
cfg := &proto.NginxConfig{}
isRetrying := false

err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}
var (
header *proto.DataChunk_Header
body []byte
)

downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata})
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Downloader", err)
}

LOOP:
for {
chunk, err := downloader.Recv()
if err != nil && err != io.EOF {
isRetrying = true
return c.handleGrpcError("Commander Downloader", err)
}

Expand Down Expand Up @@ -234,9 +252,20 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
payloadChecksum := checksum.Checksum(payload)
chunks := checksum.Chunk(payload, c.chunkSize)

isRetrying := false

return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}

sender, err := c.client.Upload(c.ctx)
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Upload", err)
}

Expand All @@ -251,6 +280,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
})
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Upload Header", err)
}

Expand All @@ -265,13 +295,15 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
},
}); err != nil {
return c.handleGrpcError("Commander Upload"+strconv.Itoa(id), err)
isRetrying = true
return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err)
}
}

log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks))
status, err := sender.CloseAndRecv()
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Upload CloseAndRecv", err)
}

Expand Down Expand Up @@ -325,14 +357,24 @@ loop:
case <-ctx.Done():
break loop
default:
isRetrying := false
err := backoff.WaitUntil(ctx, c.backoffSettings, func() error {
select {
case <-ctx.Done():
return nil
default:
if isRetrying {
log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}

cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Channel Recv", err)
}

Expand All @@ -357,8 +399,5 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error {
log.Errorf("%s: unknown grpc error while communicating with %s, %v", messagePrefix, c.grpc.Target(), err)
}

log.Infof("%s: retrying to connect to %s", messagePrefix, c.grpc.Target())
_ = c.createClient()

return err
}
8 changes: 8 additions & 0 deletions sdk/client/commander_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func TestCommander_Recv_Reconnect(t *testing.T) {

commanderClient.WithDialOptions(getDialOptions(dialer)...)

time.Sleep(50 * time.Millisecond)

defer func() {
commanderClient.Close()
if err := stopMockServer(grpcServer, dialer); err != nil {
Expand Down Expand Up @@ -298,6 +300,8 @@ func TestCommander_Send_Reconnect(t *testing.T) {
grpcServer, _, dialer = startCommanderMockServer()
commanderClient.WithDialOptions(getDialOptions(dialer)...)

time.Sleep(50 * time.Millisecond)

defer func() {
commanderClient.Close()
if err := stopMockServer(grpcServer, dialer); err != nil {
Expand Down Expand Up @@ -362,6 +366,8 @@ func TestCommander_Download_Reconnect(t *testing.T) {
}
}()

time.Sleep(10 * time.Millisecond)

actual, err := commanderClient.Download(ctx, &proto.Metadata{MessageId: "1234"})

assert.Nil(t, err)
Expand Down Expand Up @@ -532,6 +538,8 @@ func TestCommander_Upload_Reconnect(t *testing.T) {
}
}()

time.Sleep(50 * time.Millisecond)

err = commanderClient.Upload(ctx, expectedNginxConfig, "1234")
assert.Nil(t, err)

Expand Down
25 changes: 22 additions & 3 deletions sdk/client/metric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error {
if !ok {
return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data())
}

isRetrying := false

err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error {
if isRetrying {
log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target())
err := r.createClient()
if err != nil {
return err
}
}
if err := r.channel.Send(report); err != nil {
isRetrying = true
return r.handleGrpcError("Metric Reporter Channel Send", err)
}

Expand All @@ -164,8 +175,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error {
if !ok {
return fmt.Errorf("MetricReporter expected an events report message, but received %T", message.Data())
}

isRetrying := false

err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error {
if isRetrying {
log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target())
err = r.createClient()
if err != nil {
return err
}
}
if err := r.eventsChannel.Send(report); err != nil {
isRetrying = true
return r.handleGrpcError("Metric Reporter Events Channel Send", err)
}

Expand Down Expand Up @@ -201,8 +223,5 @@ func (r *metricReporter) handleGrpcError(messagePrefix string, err error) error
log.Errorf("%s: unknown grpc error while communicating with %s, %v", messagePrefix, r.grpc.Target(), err)
}

log.Infof("%s: retrying to connect to %s", messagePrefix, r.grpc.Target())
_ = r.createClient()

return err
}
14 changes: 8 additions & 6 deletions sdk/client/metric_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestMetricReporter_Connect_NoServer(t *testing.T) {
metricReporterClient.WithServer("unknown")
metricReporterClient.WithDialOptions(grpcDialOptions...)
metricReporterClient.WithBackoffSettings(backoff.BackoffSettings{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 100 * time.Millisecond,
InitialInterval: 50 * time.Millisecond,
MaxInterval: 50 * time.Millisecond,
MaxElapsedTime: 300 * time.Millisecond,
})

Expand Down Expand Up @@ -116,8 +116,8 @@ func TestMetricReporter_Send_Reconnect(t *testing.T) {

metricReporterClient := createTestMetricReporterClient(dialer)
metricReporterClient.WithBackoffSettings(backoff.BackoffSettings{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 100 * time.Millisecond,
InitialInterval: 50 * time.Millisecond,
MaxInterval: 50 * time.Millisecond,
MaxElapsedTime: 30 * time.Second,
})
err := metricReporterClient.Connect(ctx)
Expand All @@ -137,6 +137,8 @@ func TestMetricReporter_Send_Reconnect(t *testing.T) {
}
}()

time.Sleep(50 * time.Millisecond)

err = metricReporterClient.Send(ctx, MessageFromMetrics(&proto.MetricsReport{
Meta: &proto.Metadata{
MessageId: "1234",
Expand Down Expand Up @@ -285,8 +287,8 @@ func createTestMetricReporterClient(dialer func(context.Context, string) (net.Co
metricReporterClient.WithServer("bufnet")
metricReporterClient.WithDialOptions(getDialOptions(dialer)...)
metricReporterClient.WithBackoffSettings(backoff.BackoffSettings{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 100 * time.Millisecond,
InitialInterval: 50 * time.Millisecond,
MaxInterval: 50 * time.Millisecond,
MaxElapsedTime: 300 * time.Millisecond,
})

Expand Down
Loading

0 comments on commit 236d2be

Please sign in to comment.