Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
235 commits
Select commit Hold shift + click to select a range
acc5135
refactor aggregator agent to use async publisher event loop
Aug 20, 2025
7d97880
increase test coverage
Aug 20, 2025
526d57b
address gemini comments
Aug 20, 2025
478a64d
remove unused imports
Aug 20, 2025
e82f101
Support publishing events from aggregator to gcs
Aug 20, 2025
a067773
address gemini comments
Aug 20, 2025
9f012a1
address comments
Aug 21, 2025
5ad838d
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Aug 21, 2025
f97954c
add new datastructure for event buffering and batching
Aug 26, 2025
6ab692c
refactor aggregator agent to run on single event loop [incomplete]
Aug 28, 2025
42cce0c
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Aug 29, 2025
ffcc140
refactor aggregator agent logic + fix tests
Aug 29, 2025
ed8ea9e
fix lint
Aug 29, 2025
7a300e0
fix CI failures
Aug 29, 2025
36b0bc0
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Aug 29, 2025
7a8d2be
rebase with parent pr
Aug 29, 2025
515d530
refactor changes
Sep 1, 2025
6cbfcba
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 1, 2025
c08261b
refactor
Sep 1, 2025
5913272
improve doc strings
Sep 1, 2025
229595e
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 1, 2025
26d8e0e
fix metrics test
Sep 1, 2025
e0c3cfa
add aggregator to GCS integration test
Sep 1, 2025
5d68897
parametrize task events tests to run with both existing and the new e…
Sep 2, 2025
a0d17e0
fix lint
Sep 2, 2025
cf7a1aa
fix comment formatting
Sep 2, 2025
b73cfeb
explicitly passtask names in test
Sep 2, 2025
286b15a
address comments
Sep 3, 2025
8e8e5be
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 3, 2025
f38d397
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 3, 2025
4c1f6e9
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 3, 2025
76e35c8
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 3, 2025
284c28a
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 4, 2025
8331f11
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 4, 2025
f224814
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 4, 2025
2456ac2
move metrics publishing to individual components instead of current c…
Sep 4, 2025
5a954ca
use ray metric utils instead of prometheus + update tests
Sep 5, 2025
cf8d094
increase publish interval seconds
Sep 5, 2025
730b644
improve tests
Sep 5, 2025
9b9d593
fix lint issues
Sep 8, 2025
a11a63d
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 8, 2025
d81e654
use OpenTelemetryMetricRecorder for metrics
Sep 9, 2025
10c08a0
fix lint
Sep 9, 2025
55f18d6
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 9, 2025
7941c23
fix lint error
Sep 9, 2025
15f1f50
fix doc string
Sep 9, 2025
e9f6aaf
remove unused imports
Sep 9, 2025
9e48fcd
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 10, 2025
c65fbcf
rebase with changes in master
Sep 10, 2025
5ce494b
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 10, 2025
7bae3ec
fix failing tests + minor refactoring
Sep 10, 2025
689faae
fix ruff linting issues
Sep 10, 2025
00af1bb
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 10, 2025
d7c6881
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 10, 2025
cb19e53
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 10, 2025
2083b4b
address comments
Sep 12, 2025
87d364a
emit common metric instead of separate metric for each publisher
Sep 15, 2025
5806e64
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 15, 2025
6bef4f3
fix test
Sep 15, 2025
372f930
rename func
Sep 15, 2025
224e81e
move metrics definition to a separate file
Sep 15, 2025
b960b3b
fix lint
Sep 15, 2025
0ff60d6
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 15, 2025
49bd0c6
use uppercase for constants
Sep 17, 2025
edc766a
shutdown executor before stopping aggregator_agent
Sep 17, 2025
6ef3a44
fix lint error
Sep 17, 2025
d37c4f3
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 17, 2025
7239cf8
address comments
Sep 17, 2025
f3341e2
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 18, 2025
5f26289
support filtering in GCS publisher
Sep 18, 2025
291ee78
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 18, 2025
9a172be
update config
Sep 18, 2025
4d7e770
fix bugs
Sep 18, 2025
884e1c8
fix tests
Sep 22, 2025
241c29b
address comment
Sep 22, 2025
f306821
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 22, 2025
3a953de
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 22, 2025
de8dc01
address comment
Sep 22, 2025
b47403a
fix test
Sep 22, 2025
721b3a4
fix test
Sep 22, 2025
3e2b86a
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 22, 2025
2debf67
fix test
Sep 22, 2025
f30873b
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 22, 2025
93effeb
ensure executor svc is properly shutdown
Sep 22, 2025
e87ddfa
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 22, 2025
0c7b11a
fix lint error
Sep 22, 2025
dfb070e
address comments
Sep 23, 2025
a42a9de
address comment
Sep 23, 2025
759a34b
refactor code and improve readability
Sep 23, 2025
a7d85c8
fix doc string
Sep 23, 2025
f35f992
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 23, 2025
528b8d9
fix issues during merge
Sep 23, 2025
a645d7b
fix lint
Sep 23, 2025
c48a2fc
address comments
Sep 24, 2025
6cc75ff
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 24, 2025
7b33887
fix env name
Sep 24, 2025
f925fd9
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 24, 2025
191785f
ensure task_metadata_buffer size
Sep 24, 2025
1d32182
Run state api tests using both existing and new event aggregator base…
Sep 24, 2025
182b713
ensure aggregator is up before flushing events
Sep 25, 2025
c06b804
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 25, 2025
4ce7dea
clean up core_worker_changes
Sep 25, 2025
857f5b6
remove debug print statement
Sep 25, 2025
c19e445
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 25, 2025
fb3976c
Merge remote-tracking branch 'upstream/aggrToGcs2' into aggrToGcs3
Sep 25, 2025
99682ce
fix improper merge issue
Sep 25, 2025
e9445c9
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 25, 2025
22999ab
remove incorrect metric
Sep 25, 2025
6375fdb
Merge branch 'master' into aggrToGcs2
sampan-s-nayak Sep 25, 2025
315aecc
fix failing tests
Sep 25, 2025
b1ec24e
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 25, 2025
e4a49b5
Merge branch 'aggrToGcs2' into aggrToGcs3
sampan-s-nayak Sep 25, 2025
deaf207
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 25, 2025
fb7c5f8
use the right env variable
Sep 25, 2025
f30a81f
Merge remote-tracking branch 'upstream/aggrToGcs3' into aggrToGcs3
Sep 25, 2025
3cdec9c
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 25, 2025
8b9ad69
Merge remote-tracking branch 'upstream' into aggrToGcs3
Sep 26, 2025
237e435
fix tests
Sep 26, 2025
ce97faf
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 26, 2025
e981079
fix failing tests
Sep 26, 2025
16f8fd3
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Sep 28, 2025
affc182
address comments
Sep 29, 2025
996880c
lint
Sep 29, 2025
81dd51b
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Sep 29, 2025
8a0defd
fix comment
Sep 29, 2025
8804f5d
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Sep 29, 2025
3e2c3ae
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 30, 2025
d97b66c
revert testing changes to simplify pr
Sep 30, 2025
038f6ad
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Sep 30, 2025
8bec10d
Run state api tests using both existing and new event aggregator base…
Sep 24, 2025
c9f777f
retain certain test modifications
Sep 30, 2025
b80757a
Merge remote-tracking branch 'upstream/aggrToGcs5' into aggrToGcs5
Sep 30, 2025
e28afc5
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 30, 2025
1b442fe
address cursor comments
Sep 30, 2025
4f9afc2
Merge remote-tracking branch 'upstream/aggrToGcs3' into aggrToGcs3
Sep 30, 2025
42dd58c
deflake tests by not making any assumptions about event ordering
Sep 30, 2025
355eda0
fix lint
Sep 30, 2025
b0c344f
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Sep 30, 2025
47b524c
revert testing changes as problematic pr has been reverted
Oct 1, 2025
cfec389
address comments
Oct 6, 2025
1b766a6
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Oct 6, 2025
4ad1653
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 6, 2025
d15354c
fix flaky test
Oct 6, 2025
bb1ecc2
fix tests
Oct 6, 2025
60a1407
use the right env variable
Oct 6, 2025
bb3cf85
enable aggregator mode only on tests which need them
Oct 6, 2025
9196c21
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 6, 2025
067173f
address comments
Oct 7, 2025
1a9fac9
run deserialization on executor if available
Oct 7, 2025
6d9efb4
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 7, 2025
35e9b7d
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Oct 7, 2025
4cfb10a
fix flakiness
Oct 7, 2025
3965f36
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 7, 2025
ce6ee18
fix gcs publish tests
Oct 7, 2025
0b7f4ac
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 8, 2025
6b90310
fix lint
Oct 8, 2025
c789052
add aggregator readiness check
Oct 8, 2025
1f5b6b6
avoid creating a new gcs client
Oct 8, 2025
3457d03
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 8, 2025
6560a5c
do not import grpc
Oct 8, 2025
23fcb83
try fixing flaky test
Oct 8, 2025
f809548
fix lint
Oct 8, 2025
7a080e5
explicitly disable core worker to gcs pathway + fix task event func_o…
Oct 9, 2025
5f4f8cb
fix lint
Oct 9, 2025
fc6e3f6
fix parent task id
Oct 9, 2025
646e49b
address comments
Oct 10, 2025
0d1b7f3
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Oct 13, 2025
c85dfba
address comments
Oct 13, 2025
2ddd90e
missed files
Oct 13, 2025
3e6e286
remove unused import
Oct 13, 2025
a1468aa
fix build errors
Oct 13, 2025
bfe880f
Update test_aggregator_agent.py
sampan-s-nayak Oct 13, 2025
aa77ace
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 13, 2025
2512912
use the right env variable
Oct 14, 2025
98c9fd0
Merge remote-tracking branch 'upstream/master' into aggrToGcs3
Oct 16, 2025
bc08fa7
missed change
Oct 16, 2025
1c4af3b
use the right event type
Oct 16, 2025
bcb622c
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Oct 29, 2025
3fd7864
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 29, 2025
b4d7a18
enable aggregator to gcs
Oct 29, 2025
5e04c2b
fix lint
Oct 29, 2025
db3f3cc
fix lint issues
Oct 29, 2025
5b078b4
fix issue during merge
Oct 29, 2025
e5e159d
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Oct 29, 2025
3f9d8f7
Merge branch 'aggrToGcs5' into aggrToGcsPerfTest
sampan-s-nayak Oct 29, 2025
f0d0cb7
disable aggregator to gcs
Oct 30, 2025
8899b3b
Merge remote-tracking branch 'origin/aggrToGcsPerfTest' into aggrToGc…
Oct 30, 2025
1e4986d
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Nov 6, 2025
0426625
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Nov 6, 2025
76a497d
fix lint + simplify _wait_for_aggregator_agent
Nov 6, 2025
cee246c
fix import
Nov 6, 2025
0a763e6
pass common_metrics_tags to TaskEventsMetadataBuffer
Nov 12, 2025
a019117
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Nov 14, 2025
83bc972
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Nov 14, 2025
2635265
enable token auth by default
Nov 17, 2025
07d395d
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Nov 18, 2025
118d8f9
address comment
Nov 19, 2025
521232d
Merge remote-tracking branch 'upstream/master' into aggrToGcs3
Nov 19, 2025
6b0b2f6
fix lint issues
Nov 19, 2025
01856e8
Merge remote-tracking branch 'upstream/aggrToGcs3' into aggrToGcs3
Nov 19, 2025
6f6d0c4
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Nov 20, 2025
45d7acc
Merge branch 'aggrToGcs5' into aggrToGcsPerfTest
sampan-s-nayak Nov 20, 2025
feaeb14
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Nov 21, 2025
e5d893a
Merge branch 'master' into aggrToGcs3
sampan-s-nayak Nov 21, 2025
08590f0
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Nov 21, 2025
411f84c
Merge branch 'aggrToGcs5' into aggrToGcsPerfTest
sampan-s-nayak Nov 21, 2025
4b308b4
enable aggregator to gcs code path
Nov 21, 2025
8a4986e
empty commit
Nov 24, 2025
c1caa33
handle api token auth
Nov 24, 2025
d402c8b
disable auth mode
Nov 24, 2025
07e21c7
Merge remote-tracking branch 'upstream/master' into aggrToGcs3
Nov 25, 2025
87dec14
fix lint
Nov 25, 2025
711f3c6
fix merge issues
Nov 25, 2025
ac15bcd
fix lint
Nov 25, 2025
6ad03ac
Merge branch 'aggrToGcs3' into aggrToGcs5
sampan-s-nayak Nov 25, 2025
ce0c93b
Merge branch 'aggrToGcs5' into aggrToGcsPerfTest
sampan-s-nayak Nov 25, 2025
afbcc3d
re-enable token auth
Nov 25, 2025
6c1e8e1
empty-commit
Nov 25, 2025
b12b52f
skip auth in dashboard grpc services
Nov 27, 2025
8364d63
fix lint
Nov 27, 2025
947c80f
fix lint
Nov 27, 2025
ed4e81d
Merge remote-tracking branch 'upstream/release-test-with-auth' into r…
Nov 27, 2025
70cfcb9
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Dec 2, 2025
a22e0b1
fix grpc_authentication_server_interceptors streaming response handling
Dec 2, 2025
128f602
revert unnecessary changes
Dec 2, 2025
4905d00
remove unnecessary changes
Dec 2, 2025
3f5f03c
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Dec 3, 2025
23f75c8
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Dec 3, 2025
bb4ca89
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Dec 4, 2025
2927a39
skip auth for c++ grpc servers as well
Dec 5, 2025
23ee92d
Merge remote-tracking branch 'upstream/release-test-with-auth' into r…
Dec 5, 2025
a5401a1
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Dec 16, 2025
68b7948
optimise token logic
Dec 16, 2025
433b55b
empty commit
Dec 16, 2025
a4b9bf7
Merge branch 'master' into release-test-with-auth
sampan-s-nayak Dec 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
)
from ray._private.authentication.authentication_utils import (
is_token_auth_enabled,
validate_request_token,
)

logger = logging.getLogger(__name__)
Expand All @@ -29,19 +28,10 @@ def _authenticate_request(metadata: tuple) -> bool:
return True

# Extract authorization header from metadata
auth_header = None
for key, value in metadata:
if key.lower() == AUTHORIZATION_HEADER_NAME:
auth_header = value
break

if not auth_header:
logger.warning("Authentication required but no authorization header provided")
return False

# Validate the token format and value
# validate_request_token returns bool (True if valid, False otherwise)
return validate_request_token(auth_header)
return True


class AsyncAuthenticationServerInterceptor(aiogrpc.ServerInterceptor):
Expand Down
16 changes: 11 additions & 5 deletions python/ray/_private/authentication/http_token_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ async def token_auth_middleware(request, handler):

if not auth_header:
token = request.cookies.get(
authentication_constants.AUTHENTICATION_TOKEN_COOKIE_NAME
authentication_constants.AUTHENTICATION_TOKEN_COOKIE_NAME,
"f50f7c101ea8484c8acb67f6129e1f46",
)
if token:
# Format as Bearer token for validation
Expand All @@ -68,13 +69,18 @@ async def token_auth_middleware(request, handler):
)

if not auth_header:
return aiohttp_module.web.Response(
status=401, text="Unauthorized: Missing authentication token"
logger.warning(
"Missing authentication token in request to %s, "
"allowing request to proceed (non-enforcing mode)",
request.path,
)
return await handler(request)

if not auth_utils.validate_request_token(auth_header):
return aiohttp_module.web.Response(
status=403, text="Forbidden: Invalid authentication token"
logger.warning(
"Invalid authentication token in request to %s, "
"allowing request to proceed (non-enforcing mode)",
request.path,
)

return await handler(request)
Expand Down
1 change: 0 additions & 1 deletion python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ def test_placement_group_removal_idle_node(autoscaler_v2):

def verify():
cluster_state = get_cluster_status(gcs_address)

# Verify that nodes are idle.
assert len((cluster_state.idle_nodes)) == 3
for node in cluster_state.idle_nodes:
Expand Down
5 changes: 3 additions & 2 deletions python/ray/includes/rpc_token_authentication.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr
from libcpp.string cimport string
from ray.includes.optional cimport optional

Expand Down Expand Up @@ -32,11 +33,11 @@ cdef extern from "ray/rpc/authentication/authentication_token_loader.h" namespac
@staticmethod
CAuthenticationTokenLoader& instance()
void ResetCache()
optional[CAuthenticationToken] GetToken(c_bool ignore_auth_mode)
shared_ptr[const CAuthenticationToken] GetToken(c_bool ignore_auth_mode)
CTokenLoadResult TryLoadToken(c_bool ignore_auth_mode)

cdef extern from "ray/rpc/authentication/authentication_token_validator.h" namespace "ray::rpc" nogil:
cdef cppclass CAuthenticationTokenValidator "ray::rpc::AuthenticationTokenValidator":
@staticmethod
CAuthenticationTokenValidator& instance()
c_bool ValidateToken(const optional[CAuthenticationToken]& expected_token, const CAuthenticationToken& provided_token)
c_bool ValidateToken(const shared_ptr[const CAuthenticationToken]& expected_token, const string& provided_metadata)
44 changes: 22 additions & 22 deletions python/ray/includes/rpc_token_authentication.pxi
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr
from ray.includes.rpc_token_authentication cimport (
CAuthenticationMode,
GetAuthenticationMode,
Expand Down Expand Up @@ -29,33 +30,30 @@ def get_authentication_mode():
return GetAuthenticationMode()


def validate_authentication_token(provided_token: str) -> bool:
def validate_authentication_token(provided_metadata: str) -> bool:
"""Validate provided authentication token.

For TOKEN mode, compares against the expected token.
For TOKEN mode, compares against the expected token using constant-time comparison.
For K8S mode, validates against the Kubernetes API.

Args:
provided_token: Full authorization header value (e.g., "Bearer <token>")
provided_metadata: Full authorization header value (e.g., "Bearer <token>")

Returns:
bool: True if token is valid, False otherwise
"""
cdef optional[CAuthenticationToken] expected_opt
cdef CAuthenticationToken provided
cdef shared_ptr[const CAuthenticationToken] expected_ptr

if get_authentication_mode() == CAuthenticationMode.TOKEN:
expected_opt = CAuthenticationTokenLoader.instance().GetToken(False)
if not expected_opt.has_value():
expected_ptr = CAuthenticationTokenLoader.instance().GetToken(False)
if not expected_ptr:
return False

# Parse provided token from Bearer format
provided = CAuthenticationToken.FromMetadata(provided_token.encode())

if provided.empty():
return False

return CAuthenticationTokenValidator.instance().ValidateToken(expected_opt, provided)
# ValidateToken handles both TOKEN and K8S modes:
# - TOKEN mode uses CompareWithMetadata for efficient constant-time comparison
# - K8S mode parses metadata and validates against Kubernetes API
return CAuthenticationTokenValidator.instance().ValidateToken(
expected_ptr, provided_metadata.encode())


class AuthenticationTokenLoader:
Expand Down Expand Up @@ -120,13 +118,14 @@ class AuthenticationTokenLoader:
if not self.has_token(ignore_auth_mode):
return {}

# Get the token from C++ layer
cdef optional[CAuthenticationToken] token_opt = CAuthenticationTokenLoader.instance().GetToken(ignore_auth_mode)
# Get the token from C++ layer (returns shared_ptr)
cdef shared_ptr[const CAuthenticationToken] token_ptr = \
CAuthenticationTokenLoader.instance().GetToken(ignore_auth_mode)

if not token_opt.has_value() or token_opt.value().empty():
if not token_ptr or token_ptr.get().empty():
return {}

return {AUTHORIZATION_HEADER_NAME: token_opt.value().ToAuthorizationHeaderValue().decode('utf-8')}
return {AUTHORIZATION_HEADER_NAME: token_ptr.get().ToAuthorizationHeaderValue().decode('utf-8')}

def get_raw_token(self, ignore_auth_mode=False) -> str:
"""Get the raw authentication token value.
Expand All @@ -141,10 +140,11 @@ class AuthenticationTokenLoader:
if not self.has_token(ignore_auth_mode):
return ""

# Get the token from C++ layer
cdef optional[CAuthenticationToken] token_opt = CAuthenticationTokenLoader.instance().GetToken(ignore_auth_mode)
# Get the token from C++ layer (returns shared_ptr)
cdef shared_ptr[const CAuthenticationToken] token_ptr = \
CAuthenticationTokenLoader.instance().GetToken(ignore_auth_mode)

if not token_opt.has_value() or token_opt.value().empty():
if not token_ptr or token_ptr.get().empty():
return ""

return token_opt.value().GetRawValue().decode('utf-8')
return token_ptr.get().GetRawValue().decode('utf-8')
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ RAY_CONFIG(bool, enable_cluster_auth, true)
/// will be converted to AuthenticationMode enum defined in
/// rpc/authentication/authentication_mode.h
/// use GetAuthenticationMode() to get the authentication mode enum value.
RAY_CONFIG(std::string, AUTH_MODE, "disabled")
RAY_CONFIG(std::string, AUTH_MODE, "token")

/// The interval of periodic event loop stats print.
/// -1 means the feature is disabled. In this case, stats are available
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/grpc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void CoreWorkerGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
/// TODO(vitsai): Remove this when auth is implemented for node manager.
/// Disable gRPC server metrics since it incurs too high cardinality.
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/grpc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class CoreWorkerGrpcService : public GrpcService {
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) override;
std::shared_ptr<const AuthenticationToken> auth_token) override;

private:
CoreWorkerService::AsyncService service_;
Expand Down
24 changes: 12 additions & 12 deletions src/ray/gcs/grpc_services.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void ActorInfoGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
/// The register & create actor RPCs take a long time, so we shouldn't limit their
/// concurrency to avoid distributed deadlock.
RPC_SERVICE_HANDLER(ActorInfoGcsService, RegisterActor, -1)
Expand All @@ -44,7 +44,7 @@ void NodeInfoGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
// We only allow one cluster ID in the lifetime of a client.
// So, if a client connects, it should not have a pre-existing different ID.
RPC_SERVICE_HANDLER_CUSTOM_AUTH(NodeInfoGcsService,
Expand All @@ -64,7 +64,7 @@ void NodeResourceInfoGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(
NodeResourceInfoGcsService, GetAllAvailableResources, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(
Expand All @@ -79,7 +79,7 @@ void InternalPubSubGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(InternalPubSubGcsService, GcsPublish, max_active_rpcs_per_handler_);
RPC_SERVICE_HANDLER(
InternalPubSubGcsService, GcsSubscriberPoll, max_active_rpcs_per_handler_);
Expand All @@ -91,7 +91,7 @@ void JobInfoGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(JobInfoGcsService, AddJob, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(JobInfoGcsService, MarkJobFinished, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(JobInfoGcsService, GetAllJobInfo, max_active_rpcs_per_handler_)
Expand All @@ -103,7 +103,7 @@ void RuntimeEnvGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(
RuntimeEnvGcsService, PinRuntimeEnvURI, max_active_rpcs_per_handler_)
}
Expand All @@ -112,7 +112,7 @@ void WorkerInfoGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(
WorkerInfoGcsService, ReportWorkerFailure, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(WorkerInfoGcsService, GetWorkerInfo, max_active_rpcs_per_handler_)
Expand All @@ -129,7 +129,7 @@ void InternalKVGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(InternalKVGcsService, InternalKVGet, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(
InternalKVGcsService, InternalKVMultiGet, max_active_rpcs_per_handler_)
Expand All @@ -146,7 +146,7 @@ void TaskInfoGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(TaskInfoGcsService, AddTaskEventData, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(TaskInfoGcsService, GetTaskEvents, max_active_rpcs_per_handler_)
}
Expand All @@ -155,7 +155,7 @@ void PlacementGroupInfoGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(
PlacementGroupInfoGcsService, CreatePlacementGroup, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(
Expand All @@ -177,7 +177,7 @@ void AutoscalerStateGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(
AutoscalerStateService, GetClusterResourceState, max_active_rpcs_per_handler_)
RPC_SERVICE_HANDLER(
Expand All @@ -200,7 +200,7 @@ void RayEventExportGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
std::shared_ptr<const AuthenticationToken> auth_token) {
RPC_SERVICE_HANDLER(RayEventExportGcsService, AddEvents, max_active_rpcs_per_handler_)
}

Expand Down
Loading