Skip to content

Conversation

@chiayi
Copy link
Contributor

@chiayi chiayi commented Dec 3, 2025

Co-authored-by: @chiayi [email protected]
Co-authored-by: @KunWuLuan [email protected]

Why are these changes needed?

This PR adds the event server part of history server that constructs tasks/actors from ray events.

Related issue number

Related to #3884

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@chiayi
Copy link
Contributor Author

chiayi commented Dec 3, 2025

Manually tested using @KunWuLuan 's historyserver and collector. Assuming that the events are stored in root/<clusterName>/<session_name>/node_events/* and root/<clusterName>/<session_name>/job_events/<job-*>/*. And with the file format being an array of events. e.g.:

[
	{
		"eventId":"kqf2gaf7DDgPEiskA/j25uel9Hwv2jZSdkA+Rg==",
		"eventType":"TASK_DEFINITION_EVENT",
		"message":"",
		"sessionName":"session_2025-12-02_13-07-25_921175_1",
		"severity":"INFO",
		"sourceType":"CORE_WORKER",
		"taskDefinitionEvent":{
			"jobId":"AQAAAA==",
			"language":"PYTHON",
			"parentTaskId":"////////////////////////////////",
			"placementGroupId":"////////////////////////",
			"refIds":{
			},
			"requiredResources":{
			},
			"serializedRuntimeEnv":"",
			"taskAttempt":0,
			"taskFunc":{
			},
			"taskId":"//////////////////////////8BAAAA",
			"taskName":"",
			"taskType":"DRIVER_TASK"
		},
		"timestamp":"2025-12-02T21:11:35.818441374Z"
	},
	{
		"eventId":"oY256kANCK8YnJNZCKsBvZV2Cq/y1rOTRzaktw==",
		"eventType":"TASK_LIFECYCLE_EVENT",
		"message":"",
		"sessionName":"session_2025-12-02_13-07-25_921175_1",
		"severity":"INFO",
		"sourceType":"CORE_WORKER",
		"taskLifecycleEvent":{
			"jobId":"",
			"nodeId":"",
			"stateTransitions":[
				{
					"state":"RUNNING",
					"timestamp":"2025-12-02T21:11:35.818441374Z"
				}
			],
			"taskAttempt":0,
			"taskId":"//////////////////////////8BAAAA",
			"workerId":"",
			"workerPid":0
		},
		"timestamp":"2025-12-02T21:11:35.818441374Z"
	},
]

Running the eventserver will read these files and turn them into task objects. I've also temporary replaced the historyserver endpoint and function getTaskDetail() to return the unformatted list of tasks. Running curl localhost:8080/api/v0/tasks?filter_keys=task_id returns

[
	{
		"taskId":"//////////////////////////8BAAAA",
		"taskName":"",
		"taskAttempt":0,
		"State":"",
		"jobId":"AQAAAA==",
		"nodeId":"",
		"ActorID":"",
		"placementGroupId":"////////////////////////",
		"taskType":"DRIVER_TASK",
		"functionName":"",
		"language":"PYTHON",
		"requiredResources":{
		},
		"StartTime":"0001-01-01T00:00:00Z",
		"EndTime":"0001-01-01T00:00:00Z",
		"workerId":"",
		"errorType":"",
		"errorMessage":"",
		"TaskLogInfo":null,
		"CallSite":"",
		"LabelSelector":null
	},
	{
		"taskId":"Z6Loz6WgbbP///////////////8CAAAA",
		"taskName":"remote_worker_task",
		"taskAttempt":0,
		"State":"",
		"jobId":"AgAAAA==",
		"nodeId":"",
		"ActorID":"",
		"placementGroupId":"////////////////////////",
		"taskType":"NORMAL_TASK",
		"functionName":"",
		"language":"PYTHON",
		"requiredResources":{
			"CPU":1
		},
		"StartTime":"0001-01-01T00:00:00Z",
		"EndTime":"0001-01-01T00:00:00Z",
		"workerId":"",
		"errorType":"",
		"errorMessage":"",
		"TaskLogInfo":null,
		"CallSite":"",
		"LabelSelector":null
	},
]

@chiayi chiayi marked this pull request as ready for review December 3, 2025 20:03
logrus.Infof("Event processor received stop signal, exiting.")
return
default:
// S3, minio, and GCS are flat structures, object names are whole paths

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there anything we'd need to do to ensure hygiene for azure blob storage originating data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MengjinYan
Copy link

Thanks for the PR! I think the high level approach looks good!

Some a bit detailed comment:

  • About the task event process, from Ray's perspective the unique identifier of task execution is a tuple of (task_id, attempt_number) so probably different attempt of the same task should be different entries in the task map.
  • Regarding the data model of tasks/actors, wondering should we also want to store the task state transition timeline as well?

I'll wait for more polished version of the PR to do more detailed review.

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's some field we can't get from ray event, should we get it from log dir?
for example, get the task field profiling_data from

{session_dir}/logs/export_events/event_EXPORT_TASK.log

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, can you show me step by step how to test this PR in local env using kind?
It will be very helpful for me if you can show how where the event processor call Run and get data from the object storage.
for example, it would be helpful if there's a guide like this.

#4187 (review)

@MengjinYan
Copy link

If there's some field we can't get from ray event, should we get it from log dir? for example, get the task field profiling_data from

{session_dir}/logs/export_events/event_EXPORT_TASK.log

event_EXPORT_TASK.log is populated by the old event framework. We should already convert all the events generated to event_EXPORT_TASK.log the events in the new task event framework. And the profiling_data should be converted by task profile event.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants