8
8
9
9
use crate :: proto:: spire:: api:: agent:: delegatedidentity:: v1:: {
10
10
delegated_identity_client:: DelegatedIdentityClient as DelegatedIdentityApiClient ,
11
- SubscribeToX509BundlesRequest , SubscribeToX509BundlesResponse , SubscribeToX509sviDsRequest ,
12
- SubscribeToX509sviDsResponse ,
11
+ FetchJwtsviDsRequest , SubscribeToX509BundlesRequest , SubscribeToX509BundlesResponse ,
12
+ SubscribeToX509sviDsRequest , SubscribeToX509sviDsResponse , SubscribeToJwtBundlesRequest ,
13
+ SubscribeToJwtBundlesResponse ,
13
14
} ;
15
+ use crate :: proto:: spire:: api:: types:: Jwtsvid as ProtoJwtSvid ;
14
16
use spiffe:: bundle:: x509:: { X509Bundle , X509BundleSet } ;
17
+ use spiffe:: bundle:: jwt:: { JwtBundleSet , JwtBundle } ;
15
18
use spiffe:: spiffe_id:: TrustDomain ;
19
+ use spiffe:: svid:: jwt:: JwtSvid ;
16
20
use spiffe:: svid:: x509:: X509Svid ;
17
21
use spiffe:: workload_api:: address:: validate_socket_path;
18
22
use tokio_stream:: { Stream , StreamExt } ;
19
23
20
24
use crate :: selectors:: Selector ;
21
25
use spiffe:: workload_api:: client:: { ClientError , DEFAULT_SVID } ;
22
26
use std:: convert:: { Into , TryFrom } ;
27
+ use std:: str:: FromStr ;
23
28
use tokio:: net:: UnixStream ;
24
29
use tonic:: transport:: { Endpoint , Uri } ;
25
30
use tower:: service_fn;
26
31
27
32
/// Name of the environment variable that holds the default socket endpoint path.
28
- pub const ADMIN_SOCKET_ENV : & str = "SPIFFE_ADMIN_ENDPOINT_SOCKET " ;
33
+ pub const ADMIN_SOCKET_ENV : & str = "SPIRE_ADMIN_ENDPOINT_SOCKET " ;
29
34
30
35
/// Gets the endpoint socket endpoint path from the environment variable `ADMIN_SOCKET_ENV`,
31
36
/// as described in [SPIFFE standard](https://github.com/spiffe/spiffe/blob/main/standards/SPIFFE_Workload_Endpoint.md#4-locating-the-endpoint).
@@ -132,7 +137,7 @@ impl DelegatedIdentityClient {
132
137
///
133
138
/// Returns [`ClientError`] if the gRPC call fails or if the SVID could not be parsed from the gRPC response.
134
139
pub async fn fetch_x509_svid (
135
- mut self ,
140
+ & mut self ,
136
141
selectors : Vec < Selector > ,
137
142
) -> Result < X509Svid , ClientError > {
138
143
let request = SubscribeToX509sviDsRequest {
@@ -172,7 +177,7 @@ impl DelegatedIdentityClient {
172
177
///
173
178
/// Individual stream items might also be errors if there's an issue processing the response for a specific update.
174
179
pub async fn stream_x509_svids (
175
- mut self ,
180
+ & mut self ,
176
181
selectors : Vec < Selector > ,
177
182
) -> Result < impl Stream < Item = Result < X509Svid , ClientError > > , ClientError > {
178
183
let request = SubscribeToX509sviDsRequest {
@@ -197,7 +202,7 @@ impl DelegatedIdentityClient {
197
202
///
198
203
/// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or
199
204
/// there is a problem processing the response.
200
- pub async fn fetch_x509_bundles ( mut self ) -> Result < X509BundleSet , ClientError > {
205
+ pub async fn fetch_x509_bundles ( & mut self ) -> Result < X509BundleSet , ClientError > {
201
206
let request = SubscribeToX509BundlesRequest :: default ( ) ;
202
207
203
208
let response: tonic:: Response < tonic:: Streaming < SubscribeToX509BundlesResponse > > =
@@ -227,7 +232,7 @@ impl DelegatedIdentityClient {
227
232
///
228
233
/// Individual stream items might also be errors if there's an issue processing the response for a specific update.
229
234
pub async fn stream_x509_bundles (
230
- mut self ,
235
+ & mut self ,
231
236
) -> Result < impl Stream < Item = Result < X509BundleSet , ClientError > > , ClientError > {
232
237
let request = SubscribeToX509BundlesRequest :: default ( ) ;
233
238
@@ -242,6 +247,83 @@ impl DelegatedIdentityClient {
242
247
243
248
Ok ( stream)
244
249
}
250
+
251
+ /// Fetches a list of [`JwtSvid`] parsing the JWT token in the Workload API response, for the given audience and selectors.
252
+ ///
253
+ /// # Arguments
254
+ ///
255
+ /// * `audience` - A list of audiences to include in the JWT token. Cannot be empty nor contain only empty strings.
256
+ /// * `selectors` - A list of selectors to filter the list of [`JwtSvid`].
257
+ ///
258
+ /// # Errors
259
+ ///
260
+ /// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or
261
+ /// there is a problem processing the response.
262
+ pub async fn fetch_jwt_svids < T : AsRef < str > + ToString > (
263
+ & mut self ,
264
+ audience : & [ T ] ,
265
+ selectors : Vec < Selector > ,
266
+ ) -> Result < Vec < JwtSvid > , ClientError > {
267
+ let request = FetchJwtsviDsRequest {
268
+ audience : audience. iter ( ) . map ( |s| s. to_string ( ) ) . collect ( ) ,
269
+ selectors : selectors. into_iter ( ) . map ( |s| s. into ( ) ) . collect ( ) ,
270
+ } ;
271
+
272
+ DelegatedIdentityClient :: parse_jwt_svid_from_grpc_response (
273
+ self . client
274
+ . fetch_jwtsvi_ds ( request)
275
+ . await ?
276
+ . into_inner ( )
277
+ . svids ,
278
+ )
279
+ }
280
+
281
+
282
+
283
+ /// Watches the stream of [`JwtBundleSet`] updates.
284
+ ///
285
+ /// This function establishes a stream with the Workload API to continuously receive updates for the [`JwtBundleSet`].
286
+ /// The returned stream can be used to asynchronously yield new `JwtBundleSet` updates as they become available.
287
+ ///
288
+ /// # Returns
289
+ ///
290
+ /// Returns a stream of `Result<JwtBundleSet, ClientError>`. Each item represents an updated [`JwtBundleSet`] or an error if
291
+ /// there was a problem processing an update from the stream.
292
+ ///
293
+ /// # Errors
294
+ ///
295
+ /// The function can return an error variant of [`ClientError`] in the following scenarios:
296
+ ///
297
+ /// * There's an issue connecting to the Workload API.
298
+ /// * An error occurs while setting up the stream.
299
+ ///
300
+ /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
301
+ pub async fn stream_jwt_bundles (
302
+ & mut self ,
303
+ ) -> Result < impl Stream < Item = Result < JwtBundleSet , ClientError > > , ClientError > {
304
+ let request = SubscribeToJwtBundlesRequest :: default ( ) ;
305
+ let response = self . client . subscribe_to_jwt_bundles ( request) . await ?;
306
+ Ok ( response. into_inner ( ) . map ( |message| {
307
+ message
308
+ . map_err ( ClientError :: from)
309
+ . and_then ( DelegatedIdentityClient :: parse_jwt_bundle_set_from_grpc_response)
310
+ } ) )
311
+ }
312
+
313
+ /// Fetches [`JwtBundleSet`] that is a set of [`JwtBundle`] keyed by the trust domain to which they belong.
314
+ ///
315
+ /// # Errors
316
+ ///
317
+ /// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or
318
+ /// there is a problem processing the response.
319
+ pub async fn fetch_jwt_bundles (
320
+ & mut self ,
321
+ ) -> Result < JwtBundleSet , ClientError > {
322
+ let request = SubscribeToJwtBundlesRequest :: default ( ) ;
323
+ let response = self . client . subscribe_to_jwt_bundles ( request) . await ?;
324
+ let initial = response. into_inner ( ) . message ( ) . await ?;
325
+ DelegatedIdentityClient :: parse_jwt_bundle_set_from_grpc_response ( initial. ok_or ( ClientError :: EmptyResponse ) ?)
326
+ }
245
327
}
246
328
247
329
impl DelegatedIdentityClient {
@@ -266,6 +348,32 @@ impl DelegatedIdentityClient {
266
348
. map_err ( |e| e. into ( ) )
267
349
}
268
350
351
+ fn parse_jwt_svid_from_grpc_response (
352
+ svids : Vec < ProtoJwtSvid > ,
353
+ ) -> Result < Vec < JwtSvid > , ClientError > {
354
+ let result: Result < Vec < JwtSvid > , ClientError > = svids
355
+ . iter ( )
356
+ . map ( |r| JwtSvid :: from_str ( & r. token ) . map_err ( ClientError :: InvalidJwtSvid ) )
357
+ . collect ( ) ;
358
+ result
359
+ }
360
+
361
+ fn parse_jwt_bundle_set_from_grpc_response (
362
+ response : SubscribeToJwtBundlesResponse ,
363
+ ) -> Result < JwtBundleSet , ClientError > {
364
+ let mut bundle_set = JwtBundleSet :: new ( ) ;
365
+
366
+ for ( td, bundle_data) in response. bundles . into_iter ( ) {
367
+ let trust_domain = TrustDomain :: try_from ( td) ?;
368
+ let bundle = JwtBundle :: from_jwt_authorities ( trust_domain, & bundle_data)
369
+ . map_err ( ClientError :: from) ?;
370
+
371
+ bundle_set. add_bundle ( bundle) ;
372
+ }
373
+
374
+ Ok ( bundle_set)
375
+ }
376
+
269
377
fn parse_x509_bundle_set_from_grpc_response (
270
378
response : SubscribeToX509BundlesResponse ,
271
379
) -> Result < X509BundleSet , ClientError > {
0 commit comments