Skip to content
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
ce73705
Add authentication token logic and related tests
Oct 23, 2025
341b108
Add gRPC service and server logic with auth integration tests
Oct 23, 2025
c821c21
revert unneeded changs from src/ray/rpc/tests/BUILD.bazel
Oct 23, 2025
a14dc69
readd dependencies
Oct 23, 2025
7834733
Merge branch 'master' into token_auth_1
sampan-s-nayak Oct 24, 2025
4801ed7
address comments + fix build
Oct 24, 2025
65c3ded
Merge branch 'token_auth_1' into token_auth_2
sampan-s-nayak Oct 24, 2025
d24f23c
address comments
Oct 24, 2025
d801db6
Merge branch 'master' into token_auth_2
edoakes Oct 24, 2025
f6017a0
[Core] token auth support in bidi-syncer and pubsub rpc
Oct 26, 2025
b8bec0c
fix lint
Oct 26, 2025
1ca6f2f
add missing import
Oct 26, 2025
e9cc57f
address comments
Oct 26, 2025
f8c08e0
fix lint
Oct 26, 2025
b128e4e
Merge remote-tracking branch 'upstream/token_auth_2' into token_auth_2
Oct 26, 2025
c8cff1d
Merge branch 'master' into token_auth_2
sampan-s-nayak Oct 26, 2025
a7a8efa
fix ci
Oct 26, 2025
5a91771
Merge remote-tracking branch 'upstream/token_auth_2' into token_auth_2
Oct 26, 2025
5910ecf
fix build.bazel and imports
Oct 27, 2025
d36e22f
fix lint
Oct 27, 2025
4063d74
fix lint issues
Oct 27, 2025
babc20f
Merge branch 'token_auth_2' into token_auth_3
sampan-s-nayak Oct 27, 2025
63273bd
fix ray_syncer tests
Oct 27, 2025
12c7c04
add pub-sub test
Oct 27, 2025
ae9345b
fix lint
Oct 27, 2025
9ac5eff
[Core] Support token auth in ray Pub-Sub
Oct 27, 2025
1a571ed
Merge branch 'token_auth_2.5' into token_auth_3
sampan-s-nayak Oct 27, 2025
10eb3b0
get rid of getToken()
Oct 27, 2025
312522b
Merge remote-tracking branch 'upstream/token_auth_3' into token_auth_3
Oct 27, 2025
e3b8c3f
address comments
Oct 27, 2025
cd0f933
fix
edoakes Oct 27, 2025
199d18e
fix
edoakes Oct 27, 2025
537e90a
fix
edoakes Oct 27, 2025
b5f2143
Merge branch 'token_auth_2.5' into token_auth_3
sampan-s-nayak Oct 28, 2025
17601c8
Merge branch 'master' into token_auth_2.5
sampan-s-nayak Oct 28, 2025
3bc34f2
[Core] Introduce new macros for user facing exceptions
Oct 28, 2025
e5b90ba
fix lint issues
Oct 28, 2025
acd95ac
dont print stack trace for user errors
Oct 28, 2025
c5be15f
address comments
Oct 28, 2025
f7f4ba2
Merge branch 'user_error_macro' into token_auth_2.5
sampan-s-nayak Oct 28, 2025
2698b8d
use RAY_USER_CHECK instead of RAY_CHECK + fix test
Oct 28, 2025
8572c01
fix lint
Oct 28, 2025
d47ae2b
improve test
Oct 28, 2025
d054131
fix lint
Oct 28, 2025
2ee5555
fix lint
Oct 28, 2025
ee6e775
Merge branch 'token_auth_2.5' into token_auth_3
sampan-s-nayak Oct 28, 2025
06d1773
fix builds
Oct 28, 2025
cc69ae3
lint
Oct 28, 2025
06a71b4
Merge branch 'master' into user_error_macro
sampan-s-nayak Oct 28, 2025
8b4fc91
Merge branch 'user_error_macro' into token_auth_2.5
sampan-s-nayak Oct 28, 2025
9f0a563
attempt to fix test
Oct 28, 2025
15aa5e2
attempt to fix test
Oct 28, 2025
47bb5b0
Merge branch 'token_auth_2.5' into token_auth_3
sampan-s-nayak Oct 28, 2025
1c600e6
fix test
Oct 29, 2025
f23ea2e
revert logging changes
Oct 31, 2025
4646909
address comments
Oct 31, 2025
1274e74
attempt to fix test
Oct 28, 2025
fe6bab4
[Core] Support token auth in ray Pub-Sub (#58186)
sampan-s-nayak Oct 29, 2025
75d19b7
Merge branch 'user_error_macro' into token_auth_3
sampan-s-nayak Oct 30, 2025
78f8431
Merge branch 'token_auth_2.5' into token_auth_3
sampan-s-nayak Oct 31, 2025
4e484ac
Merge remote-tracking branch 'upstream/token_auth_3' into token_auth_3
Oct 31, 2025
a2584d4
Merge remote-tracking branch 'upstream/master' into token_auth_3
Nov 1, 2025
82d0b7c
fix build
Nov 1, 2025
b64bd44
fix lint
Nov 1, 2025
f27a39a
fix build
Nov 2, 2025
b8f2d63
fix issue during merge conflict
Nov 2, 2025
6d1a9c6
revert unneeded changes
Nov 3, 2025
80c7743
address cursor comment
Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ray/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ ray_cc_library(
"//src/ray/raylet_rpc_client:raylet_client_pool",
"//src/ray/rpc:grpc_server",
"//src/ray/rpc:metrics_agent_client",
"//src/ray/rpc/authentication:authentication_token_loader",
"//src/ray/util:counter_map",
"//src/ray/util:exponential_backoff",
"//src/ray/util:network_util",
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "ray/observability/metric_constants.h"
#include "ray/pubsub/publisher.h"
#include "ray/raylet_rpc_client/raylet_client.h"
#include "ray/rpc/authentication/authentication_token_loader.h"
#include "ray/stats/stats.h"
#include "ray/util/network_util.h"

Expand Down Expand Up @@ -615,7 +616,8 @@ void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_->Register(
syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get());
rpc_server_.RegisterService(std::make_unique<syncer::RaySyncerService>(*ray_syncer_));
rpc_server_.RegisterService(std::make_unique<syncer::RaySyncerService>(
*ray_syncer_, ray::rpc::AuthenticationTokenLoader::instance().GetToken()));
}

void GcsServer::InitFunctionManager() {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/ray_syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ ray_cc_library(
],
deps = [
"//src/ray/common:asio",
"//src/ray/common:constants",
"//src/ray/common:id",
"//src/ray/protobuf:ray_syncer_cc_grpc",
"//src/ray/rpc/authentication:authentication_token",
"//src/ray/rpc/authentication:authentication_token_loader",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/container:flat_hash_map",
],
Expand Down
3 changes: 2 additions & 1 deletion src/ray/ray_syncer/ray_syncer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ ServerBidiReactor *RaySyncerService::StartSync(grpc::CallbackServerContext *cont
}
RAY_LOG(INFO).WithField(NodeID::FromBinary(node_id)) << "Connection is broken.";
syncer_.node_state_->RemoveNode(node_id);
});
},
/*auth_token=*/auth_token_);
RAY_LOG(DEBUG).WithField(NodeID::FromBinary(reactor->GetRemoteNodeID()))
<< "Get connection";
// Disconnect exiting connection if there is any.
Expand Down
10 changes: 9 additions & 1 deletion src/ray/ray_syncer/ray_syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "absl/container/flat_hash_map.h"
Expand All @@ -28,6 +29,7 @@
#include "ray/common/asio/periodical_runner.h"
#include "ray/common/id.h"
#include "ray/ray_syncer/common.h"
#include "ray/rpc/authentication/authentication_token.h"
#include "src/ray/protobuf/ray_syncer.grpc.pb.h"

namespace ray::syncer {
Expand Down Expand Up @@ -197,14 +199,20 @@ class RaySyncer {
/// like tree-based one.
class RaySyncerService : public ray::rpc::syncer::RaySyncer::CallbackService {
public:
explicit RaySyncerService(RaySyncer &syncer) : syncer_(syncer) {}
explicit RaySyncerService(
RaySyncer &syncer,
std::optional<ray::rpc::AuthenticationToken> auth_token = std::nullopt)
: syncer_(syncer), auth_token_(std::move(auth_token)) {}

grpc::ServerBidiReactor<RaySyncMessage, RaySyncMessage> *StartSync(
grpc::CallbackServerContext *context) override;

private:
// The ray syncer this RPC wrappers of.
RaySyncer &syncer_;
// Authentication token for validation, will be empty if token authentication is
// disabled
std::optional<ray::rpc::AuthenticationToken> auth_token_;
};

} // namespace ray::syncer
7 changes: 7 additions & 0 deletions src/ray/ray_syncer/ray_syncer_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <string>
#include <utility>

#include "ray/rpc/authentication/authentication_token_loader.h"

namespace ray::syncer {

RayClientBidiReactor::RayClientBidiReactor(
Expand All @@ -32,6 +34,11 @@ RayClientBidiReactor::RayClientBidiReactor(
cleanup_cb_(std::move(cleanup_cb)),
stub_(std::move(stub)) {
client_context_.AddMetadata("node_id", NodeID::FromBinary(local_node_id).Hex());
// Add authentication token if token authentication is enabled
auto auth_token = ray::rpc::AuthenticationTokenLoader::instance().GetToken();
if (auth_token.has_value() && !auth_token->empty()) {
auth_token->SetMetadata(client_context_);
}
stub_->async()->StartSync(&client_context_, this);
// Prevent this call from being terminated.
// Check https://github.com/grpc/proposal/blob/master/L67-cpp-callback-api.md
Expand Down
32 changes: 30 additions & 2 deletions src/ray/ray_syncer/ray_syncer_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <string>
#include <utility>

#include "ray/common/constants.h"

namespace ray::syncer {

namespace {
Expand All @@ -35,13 +37,39 @@ RayServerBidiReactor::RayServerBidiReactor(
instrumented_io_context &io_context,
const std::string &local_node_id,
std::function<void(std::shared_ptr<const RaySyncMessage>)> message_processor,
std::function<void(RaySyncerBidiReactor *, bool)> cleanup_cb)
std::function<void(RaySyncerBidiReactor *, bool)> cleanup_cb,
const std::optional<ray::rpc::AuthenticationToken> &auth_token)
: RaySyncerBidiReactorBase<ServerBidiReactor>(
io_context,
GetNodeIDFromServerContext(server_context),
std::move(message_processor)),
cleanup_cb_(std::move(cleanup_cb)),
server_context_(server_context) {
server_context_(server_context),
auth_token_(auth_token) {
if (auth_token_.has_value() && !auth_token_->empty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm why do we check both has_value and empty? seems redundant -- if the optional has a value, we should always check the token

Copy link
Contributor Author

@sampan-s-nayak sampan-s-nayak Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to handle the case where AuthToken is present but it is an empty string (auth token loader should catch and throw an error in this scenario but added this extra check here just in case)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can also be used as a way to disable token auth for a specific service without disabling for all. if we pass an AuthToken obj with empty string then we skip token auth within that service (this is the same behaviour for both raySyncer and grpcServer base class)

// Validate authentication token
const auto &metadata = server_context->client_metadata();
auto it = metadata.find(kAuthTokenKey);
if (it == metadata.end()) {
RAY_LOG(WARNING) << "Missing authorization header in syncer connection from node "
<< NodeID::FromBinary(GetRemoteNodeID());
Finish(grpc::Status(grpc::StatusCode::UNAUTHENTICATED,
"Missing authorization header"));
return;
}

const std::string_view header(it->second.data(), it->second.length());
ray::rpc::AuthenticationToken provided_token =
ray::rpc::AuthenticationToken::FromMetadata(header);

if (!auth_token_->Equals(provided_token)) {
RAY_LOG(WARNING) << "Invalid bearer token in syncer connection from node "
<< NodeID::FromBinary(GetRemoteNodeID());
Finish(grpc::Status(grpc::StatusCode::UNAUTHENTICATED, "Invalid bearer token"));
return;
}
}

// Send the local node id to the remote
server_context_->AddInitialMetadata("node_id", NodeID::FromBinary(local_node_id).Hex());
StartSendInitialMetadata();
Expand Down
10 changes: 9 additions & 1 deletion src/ray/ray_syncer/ray_syncer_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include <gtest/gtest_prod.h>

#include <optional>
#include <string>

#include "ray/ray_syncer/common.h"
#include "ray/ray_syncer/ray_syncer_bidi_reactor.h"
#include "ray/ray_syncer/ray_syncer_bidi_reactor_base.h"
#include "ray/rpc/authentication/authentication_token.h"

namespace ray::syncer {

Expand All @@ -35,7 +37,8 @@ class RayServerBidiReactor : public RaySyncerBidiReactorBase<ServerBidiReactor>
instrumented_io_context &io_context,
const std::string &local_node_id,
std::function<void(std::shared_ptr<const RaySyncMessage>)> message_processor,
std::function<void(RaySyncerBidiReactor *, bool)> cleanup_cb);
std::function<void(RaySyncerBidiReactor *, bool)> cleanup_cb,
const std::optional<ray::rpc::AuthenticationToken> &auth_token);

~RayServerBidiReactor() override = default;

Expand All @@ -49,6 +52,11 @@ class RayServerBidiReactor : public RaySyncerBidiReactorBase<ServerBidiReactor>

/// grpc callback context
grpc::CallbackServerContext *server_context_;

/// Authentication token for validation, will be empty if token authentication is
/// disabled
std::optional<ray::rpc::AuthenticationToken> auth_token_;

FRIEND_TEST(SyncerReactorTest, TestReactorFailure);
};

Expand Down
1 change: 1 addition & 0 deletions src/ray/ray_syncer/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ray_cc_test(
"//src/mock/ray/ray_syncer:mock_ray_syncer",
"//src/ray/ray_syncer",
"//src/ray/rpc:grpc_server",
"//src/ray/rpc/authentication:authentication_token",
"//src/ray/util:network_util",
"//src/ray/util:path_utils",
"//src/ray/util:raii",
Expand Down
Loading