Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix grpc reconnect logic #401

Merged
merged 3 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,6 @@ github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
Expand Down Expand Up @@ -1210,7 +1209,6 @@ github.com/kisielk/errcheck v1.5.0 h1:e8esj/e4R+SAOwFwN+n3zr0nYeCyeweozKfO23MvHz
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
Expand Down Expand Up @@ -1301,7 +1299,6 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d h1:CdDQnGF8Nq9ocOS/xlSptM1N3BbrA6/kmaep5ggwaIA=
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d/go.mod h1:3OzsM7FXDQlpCiw2j81fOmAwQLnZnLGXVKUzeKQXIAw=
Expand All @@ -1319,6 +1316,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
Expand Down Expand Up @@ -1379,12 +1377,10 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.14.0/go.mod h1:WT//axPky3FdvXHzGw33dNdXXXfFQqmEalje+egj8As=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 h1:lIOOHPEbXzO3vnmx2gok1Tfs31Q8GQqKLc8vVqyQq/I=
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
Expand Down Expand Up @@ -1512,7 +1508,6 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/exp v0.0.0-20221211140036-ad323defaf05 h1:T8EldfGCcveFMewH5xAYxxoX3PSQMrsechlUGVFlQBU=
golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 h1:BEABXpNXLEz0WxtA+6CQIz2xkg80e+1zrhWyMcq8VzE=
Expand All @@ -1526,7 +1521,6 @@ golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhp
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -1541,7 +1535,6 @@ golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfS
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
Expand All @@ -1558,7 +1551,6 @@ golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g=
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -1578,21 +1570,17 @@ golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
Expand Down Expand Up @@ -1633,7 +1621,6 @@ google.golang.org/genproto v0.0.0-20230303212802-e74f57abe488/go.mod h1:TvhZT5f7
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/genproto v0.0.0-20230320184635-7606e756e683/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/genproto v0.0.0-20230323212658-478b75c54725/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/genproto v0.0.0-20230525234025-438c736192d0/go.mod h1:9ExIQyXL5hZrHzQceCwuSYwZZ5QZBazOcprJ5rgs3lY=
google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk=
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64=
Expand Down Expand Up @@ -1661,12 +1648,10 @@ google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsA
google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/dancannon/gorethink.v3 v3.0.5 h1:/g7PWP7zUS6vSNmHSDbjCHQh1Rqn8Jy6zSMQxAsBSMQ=
Expand Down
49 changes: 44 additions & 5 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"encoding/json"
"fmt"
"io"
"strconv"
"sync"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -148,8 +147,17 @@ func (c *commander) Send(ctx context.Context, message Message) error {
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

isRetrying := false
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}
if err := c.channel.Send(cmd); err != nil {
isRetrying = true
return c.handleGrpcError("Commander Channel Send", err)
}

Expand All @@ -168,22 +176,32 @@ func (c *commander) Recv() <-chan Message {
func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) {
log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId())
cfg := &proto.NginxConfig{}
isRetrying := false

err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}
var (
header *proto.DataChunk_Header
body []byte
)

downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata})
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Downloader", err)
}

LOOP:
for {
chunk, err := downloader.Recv()
if err != nil && err != io.EOF {
isRetrying = true
return c.handleGrpcError("Commander Downloader", err)
}

Expand Down Expand Up @@ -234,9 +252,20 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
payloadChecksum := checksum.Checksum(payload)
chunks := checksum.Chunk(payload, c.chunkSize)

isRetrying := false

return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}

sender, err := c.client.Upload(c.ctx)
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Upload", err)
}

Expand All @@ -251,6 +280,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
})
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Upload Header", err)
}

Expand All @@ -265,13 +295,15 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
},
}); err != nil {
return c.handleGrpcError("Commander Upload"+strconv.Itoa(id), err)
isRetrying = true
return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err)
}
}

log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks))
status, err := sender.CloseAndRecv()
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Upload CloseAndRecv", err)
}

Expand Down Expand Up @@ -325,14 +357,24 @@ loop:
case <-ctx.Done():
break loop
default:
isRetrying := false
err := backoff.WaitUntil(ctx, c.backoffSettings, func() error {
select {
case <-ctx.Done():
return nil
default:
if isRetrying {
log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}

cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
isRetrying = true
return c.handleGrpcError("Commander Channel Recv", err)
}

Expand All @@ -357,8 +399,5 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error {
log.Errorf("%s: unknown grpc error while communicating with %s, %v", messagePrefix, c.grpc.Target(), err)
}

log.Infof("%s: retrying to connect to %s", messagePrefix, c.grpc.Target())
_ = c.createClient()

return err
}
8 changes: 8 additions & 0 deletions sdk/client/commander_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func TestCommander_Recv_Reconnect(t *testing.T) {

commanderClient.WithDialOptions(getDialOptions(dialer)...)

time.Sleep(50 * time.Millisecond)

defer func() {
commanderClient.Close()
if err := stopMockServer(grpcServer, dialer); err != nil {
Expand Down Expand Up @@ -298,6 +300,8 @@ func TestCommander_Send_Reconnect(t *testing.T) {
grpcServer, _, dialer = startCommanderMockServer()
commanderClient.WithDialOptions(getDialOptions(dialer)...)

time.Sleep(50 * time.Millisecond)

defer func() {
commanderClient.Close()
if err := stopMockServer(grpcServer, dialer); err != nil {
Expand Down Expand Up @@ -362,6 +366,8 @@ func TestCommander_Download_Reconnect(t *testing.T) {
}
}()

time.Sleep(10 * time.Millisecond)

actual, err := commanderClient.Download(ctx, &proto.Metadata{MessageId: "1234"})

assert.Nil(t, err)
Expand Down Expand Up @@ -532,6 +538,8 @@ func TestCommander_Upload_Reconnect(t *testing.T) {
}
}()

time.Sleep(50 * time.Millisecond)

err = commanderClient.Upload(ctx, expectedNginxConfig, "1234")
assert.Nil(t, err)

Expand Down
25 changes: 22 additions & 3 deletions sdk/client/metric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error {
if !ok {
return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data())
}

isRetrying := false

err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error {
if isRetrying {
log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target())
err := r.createClient()
if err != nil {
return err
}
}
if err := r.channel.Send(report); err != nil {
isRetrying = true
return r.handleGrpcError("Metric Reporter Channel Send", err)
}

Expand All @@ -164,8 +175,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error {
if !ok {
return fmt.Errorf("MetricReporter expected an events report message, but received %T", message.Data())
}

isRetrying := false

err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error {
if isRetrying {
log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target())
err = r.createClient()
if err != nil {
return err
}
}
if err := r.eventsChannel.Send(report); err != nil {
isRetrying = true
return r.handleGrpcError("Metric Reporter Events Channel Send", err)
}

Expand Down Expand Up @@ -201,8 +223,5 @@ func (r *metricReporter) handleGrpcError(messagePrefix string, err error) error
log.Errorf("%s: unknown grpc error while communicating with %s, %v", messagePrefix, r.grpc.Target(), err)
}

log.Infof("%s: retrying to connect to %s", messagePrefix, r.grpc.Target())
_ = r.createClient()

return err
}
14 changes: 8 additions & 6 deletions sdk/client/metric_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestMetricReporter_Connect_NoServer(t *testing.T) {
metricReporterClient.WithServer("unknown")
metricReporterClient.WithDialOptions(grpcDialOptions...)
metricReporterClient.WithBackoffSettings(backoff.BackoffSettings{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 100 * time.Millisecond,
InitialInterval: 50 * time.Millisecond,
MaxInterval: 50 * time.Millisecond,
MaxElapsedTime: 300 * time.Millisecond,
})

Expand Down Expand Up @@ -116,8 +116,8 @@ func TestMetricReporter_Send_Reconnect(t *testing.T) {

metricReporterClient := createTestMetricReporterClient(dialer)
metricReporterClient.WithBackoffSettings(backoff.BackoffSettings{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 100 * time.Millisecond,
InitialInterval: 50 * time.Millisecond,
MaxInterval: 50 * time.Millisecond,
MaxElapsedTime: 30 * time.Second,
})
err := metricReporterClient.Connect(ctx)
Expand All @@ -137,6 +137,8 @@ func TestMetricReporter_Send_Reconnect(t *testing.T) {
}
}()

time.Sleep(50 * time.Millisecond)

err = metricReporterClient.Send(ctx, MessageFromMetrics(&proto.MetricsReport{
Meta: &proto.Metadata{
MessageId: "1234",
Expand Down Expand Up @@ -285,8 +287,8 @@ func createTestMetricReporterClient(dialer func(context.Context, string) (net.Co
metricReporterClient.WithServer("bufnet")
metricReporterClient.WithDialOptions(getDialOptions(dialer)...)
metricReporterClient.WithBackoffSettings(backoff.BackoffSettings{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 100 * time.Millisecond,
InitialInterval: 50 * time.Millisecond,
MaxInterval: 50 * time.Millisecond,
MaxElapsedTime: 300 * time.Millisecond,
})

Expand Down
Loading