-
Notifications
You must be signed in to change notification settings - Fork 7k
[Core] support token auth in ray client server #58557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 31 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
f0c063b
[core] Support token based auth in ray dashboard UI
0d3316c
fix lint issues
7660ade
address comments
5234bfe
fix lint
b30bf04
Merge remote-tracking branch 'upstream/master' into token_auth_8
f61c27f
fix typo
2da0b9b
[core] Configure an interceptor to pass auth token in python direct g…
1f1897d
fix lint issues
6aa4ce0
reduce code duplication
8186c09
empty commit
d90ffd3
[Core] Add Service Interceptor to support token authentication in das…
b464cd5
[core] Token auth usability improvements
29f511a
address comment
26de367
add test_grpc_authentication_server_interceptor to BUILD.bazel
03e5c52
Merge branch 'token_auth_10' into token_auth_11
sampan-s-nayak 2b5e01b
fix typo
f38591d
Merge branch 'token_auth_10' into token_auth_11
sampan-s-nayak c018203
[core] use client interceptor for adding auth token in c++ client calls
d66af16
fix lint issues
fbcfea4
separate out intererceptor code
d59d8ea
Merge branch 'master' into token_auth_10
edoakes 267b440
Merge branch 'token_auth_10' into token_auth_11
sampan-s-nayak 4e85216
Merge branch 'master' into token_auth_10
sampan-s-nayak e0b8b93
Merge branch 'token_auth_10' into token_auth_11
sampan-s-nayak 214b0e1
Merge branch 'token_auth_11' into token_auth_12
sampan-s-nayak 1240a8c
empty commit
c857c11
Merge branch 'token_auth_10' into token_auth_11
sampan-s-nayak 3e473ba
address comment
86f62f2
Merge branch 'token_auth_11' into token_auth_12
sampan-s-nayak 6fe7473
Merge branch 'master' into token_auth_11
edoakes 1b0b89b
Merge branch 'master' of https://github.com/ray-project/ray into toke…
edoakes 0a473bf
Merge branch 'token_auth_11' into token_auth_12
sampan-s-nayak ab8ea56
Merge branch 'master' into token_auth_11
edoakes 790150c
Merge branch 'token_auth_11' into token_auth_12
sampan-s-nayak 215b625
[Core] support token auth in ray client server
4a0566b
fix lint
ca93ce0
Merge branch 'master' into token_auth_12
sampan-s-nayak 99513f7
fix import after merge
0317fda
fix lint issues
a2f92ba
Merge branch 'token_auth_12' into token_auth_14
sampan-s-nayak fb3d6cf
fix imports
a7d5cee
Merge remote-tracking branch 'upstream/token_auth_14' into token_auth_14
d7428a0
Merge branch 'master' into token_auth_14
edoakes 2fd9386
make grpc_utils imports lazy
417f575
add BUILD.bazel
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
212 changes: 212 additions & 0 deletions
212
python/ray/_private/authentication/grpc_authentication_server_interceptor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,212 @@ | ||
| """gRPC server interceptor for token-based authentication.""" | ||
|
|
||
| import logging | ||
| from typing import Awaitable, Callable | ||
|
|
||
| import grpc | ||
| from grpc import aio as aiogrpc | ||
|
|
||
| from ray._private.authentication.authentication_constants import ( | ||
| AUTHORIZATION_HEADER_NAME, | ||
| ) | ||
| from ray._private.authentication.authentication_utils import ( | ||
| is_token_auth_enabled, | ||
| validate_request_token, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _authenticate_request(metadata: tuple) -> bool: | ||
| """Authenticate incoming request. currently only supports token authentication. | ||
|
|
||
| Args: | ||
| metadata: gRPC metadata tuple of (key, value) pairs | ||
| Returns: | ||
| True if authentication succeeds or is not required, False otherwise | ||
| """ | ||
| if not is_token_auth_enabled(): | ||
| return True | ||
|
|
||
| # Extract authorization header from metadata | ||
| auth_header = None | ||
| for key, value in metadata: | ||
| if key.lower() == AUTHORIZATION_HEADER_NAME: | ||
| auth_header = value | ||
| break | ||
|
|
||
| if not auth_header: | ||
| logger.warning("Authentication required but no authorization header provided") | ||
| return False | ||
|
|
||
| # Validate the token format and value | ||
| # validate_request_token returns bool (True if valid, False otherwise) | ||
| return validate_request_token(auth_header) | ||
|
|
||
|
|
||
| class AsyncAuthenticationServerInterceptor(aiogrpc.ServerInterceptor): | ||
| """Async gRPC server interceptor that validates authentication tokens. | ||
|
|
||
| This interceptor checks the "authorization" metadata header for a valid | ||
| Bearer token when token authentication is enabled via RAY_AUTH_MODE=token. | ||
| If the token is missing or invalid, the request is rejected with UNAUTHENTICATED status. | ||
| """ | ||
|
|
||
| async def intercept_service( | ||
| self, | ||
| continuation: Callable[ | ||
| [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler] | ||
| ], | ||
| handler_call_details: grpc.HandlerCallDetails, | ||
| ) -> grpc.RpcMethodHandler: | ||
| """Intercept service calls to validate authentication. | ||
|
|
||
| This method is called once per RPC to get the handler. We wrap the handler | ||
| to validate authentication before executing the actual RPC method. | ||
| """ | ||
| # Get the actual handler | ||
| handler = await continuation(handler_call_details) | ||
|
|
||
| if handler is None: | ||
| return None | ||
|
|
||
| # Wrap the RPC behavior with authentication check | ||
| def wrap_rpc_behavior(behavior): | ||
| """Wrap an RPC method to validate authentication first.""" | ||
| if behavior is None: | ||
| return None | ||
|
|
||
| async def wrapped(request_or_iterator, context): | ||
| if not _authenticate_request(context.invocation_metadata()): | ||
| await context.abort( | ||
| grpc.StatusCode.UNAUTHENTICATED, | ||
| "Invalid or missing authentication token", | ||
| ) | ||
| return await behavior(request_or_iterator, context) | ||
|
|
||
| return wrapped | ||
|
|
||
| # Create a wrapper class that implements RpcMethodHandler interface | ||
| class AuthenticatedHandler: | ||
| """Wrapper handler that validates authentication.""" | ||
|
|
||
| def __init__(self, original_handler, wrapper_func): | ||
| self._original = original_handler | ||
| self._wrap = wrapper_func | ||
|
|
||
| @property | ||
| def request_streaming(self): | ||
| return self._original.request_streaming | ||
|
|
||
| @property | ||
| def response_streaming(self): | ||
| return self._original.response_streaming | ||
|
|
||
| @property | ||
| def request_deserializer(self): | ||
| return self._original.request_deserializer | ||
|
|
||
| @property | ||
| def response_serializer(self): | ||
| return self._original.response_serializer | ||
|
|
||
| @property | ||
| def unary_unary(self): | ||
| return self._wrap(self._original.unary_unary) | ||
|
|
||
| @property | ||
| def unary_stream(self): | ||
| return self._wrap(self._original.unary_stream) | ||
|
|
||
| @property | ||
| def stream_unary(self): | ||
| return self._wrap(self._original.stream_unary) | ||
|
|
||
| @property | ||
| def stream_stream(self): | ||
| return self._wrap(self._original.stream_stream) | ||
|
|
||
| return AuthenticatedHandler(handler, wrap_rpc_behavior) | ||
|
|
||
|
|
||
| class SyncAuthenticationServerInterceptor(grpc.ServerInterceptor): | ||
| """Synchronous gRPC server interceptor that validates authentication tokens. | ||
|
|
||
| This interceptor checks the "authorization" metadata header for a valid | ||
| Bearer token when token authentication is enabled via RAY_AUTH_MODE=token. | ||
| If the token is missing or invalid, the request is rejected with UNAUTHENTICATED status. | ||
| """ | ||
|
|
||
| def intercept_service( | ||
| self, | ||
| continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler], | ||
| handler_call_details: grpc.HandlerCallDetails, | ||
| ) -> grpc.RpcMethodHandler: | ||
| """Intercept service calls to validate authentication. | ||
|
|
||
| This method is called once per RPC to get the handler. We wrap the handler | ||
| to validate authentication before executing the actual RPC method. | ||
| """ | ||
| # Get the actual handler | ||
| handler = continuation(handler_call_details) | ||
|
|
||
| if handler is None: | ||
| return None | ||
|
|
||
| # Wrap the RPC behavior with authentication check | ||
| def wrap_rpc_behavior(behavior): | ||
| """Wrap an RPC method to validate authentication first.""" | ||
| if behavior is None: | ||
| return None | ||
|
|
||
| def wrapped(request_or_iterator, context): | ||
| if not _authenticate_request(context.invocation_metadata()): | ||
| context.abort( | ||
| grpc.StatusCode.UNAUTHENTICATED, | ||
| "Invalid or missing authentication token", | ||
| ) | ||
| return behavior(request_or_iterator, context) | ||
|
|
||
| return wrapped | ||
|
|
||
| # Create a wrapper class that implements RpcMethodHandler interface | ||
| class AuthenticatedHandler: | ||
| """Wrapper handler that validates authentication.""" | ||
|
|
||
| def __init__(self, original_handler, wrapper_func): | ||
| self._original = original_handler | ||
| self._wrap = wrapper_func | ||
|
|
||
| @property | ||
| def request_streaming(self): | ||
| return self._original.request_streaming | ||
|
|
||
| @property | ||
| def response_streaming(self): | ||
| return self._original.response_streaming | ||
|
|
||
| @property | ||
| def request_deserializer(self): | ||
| return self._original.request_deserializer | ||
|
|
||
| @property | ||
| def response_serializer(self): | ||
| return self._original.response_serializer | ||
|
|
||
| @property | ||
| def unary_unary(self): | ||
| return self._wrap(self._original.unary_unary) | ||
|
|
||
| @property | ||
| def unary_stream(self): | ||
| return self._wrap(self._original.unary_stream) | ||
|
|
||
| @property | ||
| def stream_unary(self): | ||
| return self._wrap(self._original.stream_unary) | ||
|
|
||
| @property | ||
| def stream_stream(self): | ||
| return self._wrap(self._original.stream_stream) | ||
|
|
||
| return AuthenticatedHandler(handler, wrap_rpc_behavior) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.