@@ -6,6 +6,8 @@ import cats.syntax.all._
6
6
import fs2 ._
7
7
import org .http4s ._
8
8
import org .http4s .dsl .request ._
9
+ import org .http4s .grpc .GrpcExceptions .StatusRuntimeException
10
+ import org .http4s .grpc .GrpcStatus ._
9
11
import org .http4s .grpc .codecs .NamedHeaders
10
12
import org .http4s .headers .Allow
11
13
import org .http4s .headers .Trailer
@@ -38,7 +40,7 @@ object ServerGrpc {
38
40
): HttpRoutes [F ] = HttpRoutes .of[F ] {
39
41
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
40
42
for {
41
- status <- Deferred [F , (Int , Option [String ])]
43
+ status <- Deferred [F , (Code , Option [String ])]
42
44
trailers = status.get.map { case (i, message) =>
43
45
Headers (
44
46
NamedHeaders .GrpcStatus (i)
@@ -51,12 +53,7 @@ object ServerGrpc {
51
53
.evalMap(f(_, req.headers))
52
54
.flatMap(codecs.Messages .encodeSingle(encode)(_))
53
55
.through(timeoutStream(_)(timeout.map(_.duration)))
54
- .onFinalizeCaseWeak {
55
- case Resource .ExitCase .Errored (_ : TimeoutException ) => status.complete((4 , None )).void
56
- case Resource .ExitCase .Errored (e) => status.complete((2 , e.toString().some)).void
57
- case Resource .ExitCase .Canceled => status.complete((1 , None )).void
58
- case Resource .ExitCase .Succeeded => status.complete((0 , None )).void
59
- }
56
+ .onFinalizeCaseWeak(updateStatus(status))
60
57
.mask // ensures body closure without rst-stream
61
58
62
59
Response [F ](Status .Ok , HttpVersion .`HTTP/2`)
@@ -81,7 +78,7 @@ object ServerGrpc {
81
78
): HttpRoutes [F ] = HttpRoutes .of[F ] {
82
79
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
83
80
for {
84
- status <- Deferred [F , (Int , Option [String ])]
81
+ status <- Deferred [F , (Code , Option [String ])]
85
82
trailers = status.get.map { case (i, message) =>
86
83
Headers (
87
84
NamedHeaders .GrpcStatus (i)
@@ -94,12 +91,7 @@ object ServerGrpc {
94
91
.flatMap(f(_, req.headers))
95
92
.through(codecs.Messages .encode(encode))
96
93
.through(timeoutStream(_)(timeout.map(_.duration)))
97
- .onFinalizeCaseWeak {
98
- case Resource .ExitCase .Errored (_ : TimeoutException ) => status.complete((4 , None )).void
99
- case Resource .ExitCase .Errored (e) => status.complete((2 , e.toString().some)).void
100
- case Resource .ExitCase .Canceled => status.complete((1 , None )).void
101
- case Resource .ExitCase .Succeeded => status.complete((0 , None )).void
102
- }
94
+ .onFinalizeCaseWeak(updateStatus(status))
103
95
.mask // ensures body closure without rst-stream
104
96
Response [F ](Status .Ok , HttpVersion .`HTTP/2`)
105
97
.putHeaders(
@@ -123,7 +115,7 @@ object ServerGrpc {
123
115
): HttpRoutes [F ] = HttpRoutes .of[F ] {
124
116
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
125
117
for {
126
- status <- Deferred [F , (Int , Option [String ])]
118
+ status <- Deferred [F , (Code , Option [String ])]
127
119
trailers = status.get.map { case (i, message) =>
128
120
Headers (
129
121
NamedHeaders .GrpcStatus (i)
@@ -136,12 +128,7 @@ object ServerGrpc {
136
128
.eval(f(codecs.Messages .decode(decode)(req.body), req.headers))
137
129
.flatMap(codecs.Messages .encodeSingle(encode)(_))
138
130
.through(timeoutStream(_)(timeout.map(_.duration)))
139
- .onFinalizeCaseWeak {
140
- case Resource .ExitCase .Errored (_ : TimeoutException ) => status.complete((4 , None )).void
141
- case Resource .ExitCase .Errored (e) => status.complete((2 , e.toString().some)).void
142
- case Resource .ExitCase .Canceled => status.complete((1 , None )).void
143
- case Resource .ExitCase .Succeeded => status.complete((0 , None )).void
144
- }
131
+ .onFinalizeCaseWeak(updateStatus(status))
145
132
.mask // ensures body closure without rst-stream
146
133
147
134
Response [F ](Status .Ok , HttpVersion .`HTTP/2`)
@@ -166,7 +153,7 @@ object ServerGrpc {
166
153
): HttpRoutes [F ] = HttpRoutes .of[F ] {
167
154
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
168
155
for {
169
- status <- Deferred [F , (Int , Option [String ])]
156
+ status <- Deferred [F , (Code , Option [String ])]
170
157
trailers = status.get.map { case (i, message) =>
171
158
Headers (
172
159
NamedHeaders .GrpcStatus (i)
@@ -178,12 +165,7 @@ object ServerGrpc {
178
165
val body = f(codecs.Messages .decode(decode)(req.body), req.headers)
179
166
.through(codecs.Messages .encode(encode))
180
167
.through(timeoutStream(_)(timeout.map(_.duration)))
181
- .onFinalizeCaseWeak {
182
- case Resource .ExitCase .Errored (_ : TimeoutException ) => status.complete((4 , None )).void
183
- case Resource .ExitCase .Errored (e) => status.complete((2 , e.toString().some)).void
184
- case Resource .ExitCase .Canceled => status.complete((1 , None )).void
185
- case Resource .ExitCase .Succeeded => status.complete((0 , None )).void
186
- }
168
+ .onFinalizeCaseWeak(updateStatus(status))
187
169
.mask // ensures body closure without rst-stream
188
170
189
171
Response [F ](Status .Ok , HttpVersion .`HTTP/2`)
@@ -204,7 +186,7 @@ object ServerGrpc {
204
186
.putHeaders(
205
187
SharedGrpc .ContentType ,
206
188
SharedGrpc .TE ,
207
- NamedHeaders .GrpcStatus (12 ),
189
+ NamedHeaders .GrpcStatus (Unimplemented ),
208
190
" grpc-message" -> s " unknown method $mN for service $sN" ,
209
191
)
210
192
.pure[F ]
@@ -216,7 +198,7 @@ object ServerGrpc {
216
198
.putHeaders(
217
199
SharedGrpc .ContentType ,
218
200
SharedGrpc .TE ,
219
- NamedHeaders .GrpcStatus (12 ),
201
+ NamedHeaders .GrpcStatus (Unimplemented ),
220
202
" grpc-message" -> s " unknown service $sN" ,
221
203
)
222
204
.pure[F ]
@@ -225,7 +207,7 @@ object ServerGrpc {
225
207
.putHeaders(
226
208
SharedGrpc .ContentType ,
227
209
SharedGrpc .TE ,
228
- NamedHeaders .GrpcStatus (12 ),
210
+ NamedHeaders .GrpcStatus (Unimplemented ),
229
211
" grpc-message" -> s " unknown method $other" ,
230
212
)
231
213
.pure[F ]
@@ -234,7 +216,7 @@ object ServerGrpc {
234
216
.putHeaders(
235
217
SharedGrpc .ContentType ,
236
218
SharedGrpc .TE ,
237
- NamedHeaders .GrpcStatus (12 ),
219
+ NamedHeaders .GrpcStatus (Unimplemented ),
238
220
" grpc-message" -> s " unknown request " ,
239
221
)
240
222
.pure[F ]
@@ -248,4 +230,14 @@ object ServerGrpc {
248
230
case Some (value) => s.timeout(value)
249
231
}
250
232
233
+ private def updateStatus [F [_]: Concurrent ](
234
+ status : Deferred [F , (Code , Option [String ])]
235
+ ): Resource .ExitCase => F [Unit ] = {
236
+ case Resource .ExitCase .Errored (StatusRuntimeException (c, m)) => status.complete((c, m)).void
237
+ case Resource .ExitCase .Errored (_ : TimeoutException ) =>
238
+ status.complete((DeadlineExceeded , None )).void
239
+ case Resource .ExitCase .Errored (e) => status.complete((Unknown , e.toString().some)).void
240
+ case Resource .ExitCase .Canceled => status.complete((Cancelled , None )).void
241
+ case Resource .ExitCase .Succeeded => status.complete((Ok , None )).void
242
+ }
251
243
}
0 commit comments