diff --git a/historyserver/go.mod b/historyserver/go.mod index f64da8c9309..321527b79f8 100644 --- a/historyserver/go.mod +++ b/historyserver/go.mod @@ -1,3 +1,10 @@ module github.com/ray-project/kuberay/historyserver go 1.24.8 + +require ( + github.com/google/go-cmp v0.7.0 + github.com/sirupsen/logrus v1.9.3 +) + +require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect diff --git a/historyserver/go.sum b/historyserver/go.sum new file mode 100644 index 00000000000..87666c902f3 --- /dev/null +++ b/historyserver/go.sum @@ -0,0 +1,17 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/historyserver/pkg/eventserver/eventprocessor_interface.go b/historyserver/pkg/eventserver/eventprocessor_interface.go new file mode 100644 index 00000000000..37f9a17d6e7 --- /dev/null +++ b/historyserver/pkg/eventserver/eventprocessor_interface.go @@ -0,0 +1,7 @@ +package eventserver + +import "context" + +type EventProcessor[T any] interface { + ProcessEvents(ctx context.Context, ch <-chan T) error +} diff --git a/historyserver/pkg/eventserver/eventserver.go b/historyserver/pkg/eventserver/eventserver.go new file mode 100644 index 00000000000..6b3c835851d --- /dev/null +++ b/historyserver/pkg/eventserver/eventserver.go @@ -0,0 +1,228 @@ +package eventserver + +import ( + "context" + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/ray-project/kuberay/historyserver/pkg/eventserver/types" + "github.com/ray-project/kuberay/historyserver/pkg/storage" + "github.com/ray-project/kuberay/historyserver/pkg/utils" + "github.com/sirupsen/logrus" +) + +type EventHandler struct { + reader storage.StorageReader + + ClusterTaskMap *types.ClusterTaskMap + ClusterActorMap *types.ClusterActorMap +} + +func NewEventHandler(reader storage.StorageReader) *EventHandler { + return &EventHandler{ + reader: reader, + ClusterTaskMap: &types.ClusterTaskMap{ + ClusterTaskMap: make(map[string]*types.TaskMap), + }, + ClusterActorMap: &types.ClusterActorMap{ + ClusterActorMap: make(map[string]*types.ActorMap), + }, + } +} + +// ProcessEvents func reads the channel and then processes the event received +func (h *EventHandler) ProcessEvents(ctx context.Context, ch <-chan map[string]any) error { + logrus.Infof("Starting a event processor channel") + for { + select { + case <-ctx.Done(): + // TODO: The context was cancelled, either stop here or process the rest of the events and return + // Currently, it will just stop. + logrus.Warnf("Event processor context was cancelled") + return ctx.Err() + case currEventData, ok := <-ch: + if !ok { + logrus.Warnf("Channel was closed") + return nil + } + if err := h.storeEvent(currEventData); err != nil { + return err + } + } + } +} + +// Run will start numOfEventProcessors (default to 2) processing functions and the event reader. The event reader will run once an hr, +// which is currently how often the collector flushes. +func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error { + var wg sync.WaitGroup + + if numOfEventProcessors == 0 { + numOfEventProcessors = 2 + } + eventProcessorChannels := make([]chan map[string]any, numOfEventProcessors) + cctx := make([]context.CancelFunc, numOfEventProcessors) + + for i := range numOfEventProcessors { + eventProcessorChannels[i] = make(chan map[string]any, 20) + } + + for i, currEventChannel := range eventProcessorChannels { + wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + cctx[i] = cancel + go func() { + defer wg.Done() + var processor EventProcessor[map[string]any] = h + err := processor.ProcessEvents(ctx, currEventChannel) + if err == ctx.Err() { + logrus.Warnf("Event processor go routine %d is now closed", i) + return + } + if err != nil { + logrus.Errorf("event processor %d go routine failed %v", i, err) + return + } + }() + } + + // Start reading files and sending events for processing + wg.Add(1) + go func() { + defer wg.Done() + fmt.Printf("Starting this loop of event processing\n") + for { + select { + case <-stop: + for i, currChan := range eventProcessorChannels { + close(currChan) + cctx[i]() + } + logrus.Infof("Event processor received stop signal, exiting.") + return + default: + // S3, minio, and GCS are flat structures, object names are whole paths + clusterList := h.reader.ListClusters() + for _, clusterInfo := range clusterList { + eventFileList := append(h.getAllJobEventFiles(clusterInfo), h.getAllNodeEventFiles(clusterInfo)...) + + logrus.Infof("current eventFileList for cluster %s is: %v", clusterInfo.Name, eventFileList) + for i := range eventFileList { + // TODO: Filter out ones that have already been read + logrus.Infof("Reading event file: %s", eventFileList[i]) + + eventioReader := h.reader.GetContent(clusterInfo.Name, eventFileList[i]) + eventbytes, err := io.ReadAll(eventioReader) + if err != nil { + logrus.Fatal(err) + return + } + + // Unmarshal the list of events + var eventList []map[string]any + if err := json.Unmarshal(eventbytes, &eventList); err != nil { + logrus.Fatalf("Failed to unmarshal event: %v", err) + return + } + + // Evenly distribute event to each channel + for i, curr := range eventList { + // current index % number of event processors dictates which channel it goes to + curr["clusterName"] = clusterInfo.Name + eventProcessorChannels[i%numOfEventProcessors] <- curr + } + } + } + } + + // Sleep 1 hr since thats how often it flushes + logrus.Infof("Finished reading files, going to sleep...") + time.Sleep(1 * time.Hour) + } + }() + + wg.Wait() + return nil +} + +// storeEvent unmarshals the event map into the correct actor/task struct and then stores it into the corresonding list +func (h *EventHandler) storeEvent(eventMap map[string]any) error { + eventType := types.EventType(eventMap["eventType"].(string)) + currentClusterName := eventMap["clusterName"].(string) + logrus.Infof("current eventType: %v", eventType) + switch eventType { + case types.TASK_DEFINITION_EVENT: + // We take out the taskDefinitionEvent from the event, marshal it into json so we can + // unmarshal it into a Task object. + var currTask types.Task + taskDef, ok := eventMap["taskDefinitionEvent"] + if !ok { + return fmt.Errorf("event does not have 'taskDefinitionEvent'") + } + jsonTaskDefinition, err := json.Marshal(taskDef) + if err != nil { + return err + } + + err = json.Unmarshal(jsonTaskDefinition, &currTask) + if err != nil { + return err + } + + clusterTaskMapObject, ok := h.ClusterTaskMap.ClusterTaskMap[currentClusterName] + if !ok { + // Does not exist, create a new list + clusterTaskMapObject = types.NewTaskMap() + h.ClusterTaskMap.ClusterTaskMap[currentClusterName] = clusterTaskMapObject + } + + taskId := currTask.TaskID + clusterTaskMapObject.Lock() + storedTask, ok := clusterTaskMapObject.TaskMap[taskId] + if !ok { + clusterTaskMapObject.TaskMap[taskId] = currTask + } else { + // TODO: see if there are any fields that needs to be added. Or updated i.e. taskAttempt + if storedTask.AttemptNumber < currTask.AttemptNumber { + storedTask.AttemptNumber = currTask.AttemptNumber + } + clusterTaskMapObject.TaskMap[taskId] = storedTask + } + clusterTaskMapObject.Unlock() + case types.TASK_LIFECYCLE_EVENT: + + case types.ACTOR_DEFINITION_EVENT: + + case types.ACTOR_LIFECYCLE_EVENT: + + case types.ACTOR_TASK_DEFINITION_EVENT: + default: + logrus.Infof("Event not supported, skipping: %v", eventMap) + } + + return nil +} + +// getAllJobEventFiles get all the job event files for the given cluster. +// Assuming that the events file object follow the format root/clustername/sessionid/job_events/{job-*}/* +func (h *EventHandler) getAllJobEventFiles(clusterInfo utils.ClusterInfo) []string { + var allJobFiles []string + jobEventDirPrefix := clusterInfo.Name + "/" + clusterInfo.SessionName + "/job_events/" + jobDirList := h.reader.ListFiles(clusterInfo.Name, jobEventDirPrefix) + for _, currJobDir := range jobDirList { + allJobFiles = append(allJobFiles, h.reader.ListFiles(clusterInfo.Name, currJobDir)...) + } + + return allJobFiles +} + +// getAllNodeEventFiles get all the node event files for the given cluster. +// Assuming that the events file object follow the format root/clustername/sessionid/node_events/* +func (h *EventHandler) getAllNodeEventFiles(clusterInfo utils.ClusterInfo) []string { + nodeEventDirPrefix := clusterInfo.Name + "/" + clusterInfo.SessionName + "/node_events/" + nodeEventFiles := h.reader.ListFiles(clusterInfo.Name, nodeEventDirPrefix) + return nodeEventFiles +} diff --git a/historyserver/pkg/eventserver/eventserver_test.go b/historyserver/pkg/eventserver/eventserver_test.go new file mode 100644 index 00000000000..be2e5a8bec0 --- /dev/null +++ b/historyserver/pkg/eventserver/eventserver_test.go @@ -0,0 +1,348 @@ +package eventserver + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/ray-project/kuberay/historyserver/pkg/eventserver/types" +) + +func makeTaskEventMap(taskName, nodeId, taskID, cluster string, attempt int) map[string]any { + return map[string]any{ + "eventType": string(types.TASK_DEFINITION_EVENT), + "clusterName": cluster, + "taskDefinitionEvent": map[string]any{ + "taskId": taskID, + "taskName": taskName, + "nodeId": nodeId, + "taskAttempt": attempt, + }, + } +} + +func TestEventProcessor(t *testing.T) { + tests := []struct { + name string + // Setup + eventsToSend []map[string]any + cancelAfter time.Duration // Time after which to cancel context (0 for no cancel) + closeChan bool // Whether to close the channel after sending events + + // Expectations + wantErr bool + expectedErrType error // Specific error type to check (e.g., context.Canceled) + wantStoredEvents map[string]types.Task + }{ + { + name: "process multiple events then close channel", + eventsToSend: []map[string]any{ + { + "clusterName": "cluster1", + "eventType": "TASK_DEFINITION_EVENT", + "taskDefinitionEvent": map[string]any{ + "taskId": "ID_12345", + "taskName": "Name_12345", + "nodeId": "Nodeid_12345", + "taskAttempt": 2, + }, + }, + { + "clusterName": "cluster1", + "eventType": "TASK_DEFINITION_EVENT", + "taskDefinitionEvent": map[string]any{ + "taskId": "ID_54321", + "taskName": "Name_54321", + "nodeId": "Nodeid_54321", + "taskAttempt": 1, + }, + }, + }, + closeChan: true, + wantStoredEvents: map[string]types.Task{ + "ID_12345": { + TaskID: "ID_12345", + Name: "Name_12345", + NodeID: "Nodeid_12345", + AttemptNumber: 2, + }, + "ID_54321": { + TaskID: "ID_54321", + Name: "Name_54321", + NodeID: "Nodeid_54321", + AttemptNumber: 1, + }, + }, + }, + { + name: "channel closed immediately", + closeChan: true, + wantErr: false, + }, + { + name: "context canceled", + eventsToSend: []map[string]any{ + { + "clusterName": "cluster1", + "eventType": "TASK_DEFINITION_EVENT", + "taskDefinitionEvent": map[string]any{ + "taskId": "ID_12345", + "taskName": "Name_12345", + "nodeId": "Nodeid_12345", + "taskAttempt": 2, + }, + }, + }, + cancelAfter: 50 * time.Millisecond, + wantErr: true, + expectedErrType: context.Canceled, + // Event might be processed before cancellation is detected + wantStoredEvents: map[string]types.Task{ + "ID_12345": { + TaskID: "ID_12345", + Name: "Name_12345", + NodeID: "Nodeid_12345", + AttemptNumber: 2, + }, + }, + }, + { + name: "no events, context canceled", + cancelAfter: 10 * time.Millisecond, + wantErr: true, + expectedErrType: context.Canceled, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Sending nil for reader since it won't be used anyways + h := NewEventHandler(nil) + + // Channel buffer size a bit larger than events to avoid blocking sender in test setup + ch := make(chan map[string]any, len(tt.eventsToSend)+2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Send events into the channel + go func() { + for _, event := range tt.eventsToSend { + select { + case ch <- event: + case <-ctx.Done(): // Stop sending if context is cancelled + return + } + } + if tt.closeChan { + close(ch) + } + }() + + // Handle context cancellation if specified + if tt.cancelAfter > 0 { + go func() { + time.Sleep(tt.cancelAfter) + cancel() + }() + } + + // Run the ProcessEvent + err := h.ProcessEvents(ctx, ch) + + // Check error expectations + if (err != nil) != tt.wantErr { + t.Errorf("ProcessEvents() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.expectedErrType != nil { + if !errors.Is(err, tt.expectedErrType) { + t.Errorf("ProcessEvents() error type = %T, want type %T (err: %v)", err, tt.expectedErrType, err) + } + } + + // Check stored events + if tt.wantStoredEvents != nil { + if diff := cmp.Diff(tt.wantStoredEvents, h.ClusterTaskMap.ClusterTaskMap["cluster1"].TaskMap); diff != "" { + t.Errorf("storeEventCalls diff (-want +got):\n%s", diff) + } + } + }) + } +} + +func TestStoreEvent(t *testing.T) { + initialTask := types.Task{ + TaskID: "taskid1", + Name: "taskName123", + NodeID: "nodeid123", + AttemptNumber: 0, + } + tests := []struct { + name string + initialState *types.ClusterTaskMap + eventMap map[string]any + wantErr bool + wantClusterCount int + wantTaskInCluster string // Cluster to check for the task + wantTaskID string // TaskID to check + wantTask *types.Task // Expected task, nil if not applicable + }{ + { + name: "unsupported event type", + initialState: &types.ClusterTaskMap{ + ClusterTaskMap: make(map[string]*types.TaskMap), + }, + eventMap: map[string]any{ + "eventType": "UNKNOWN_TYPE", + "clusterName": "c1", + }, + wantErr: false, + wantClusterCount: 0, + }, + { + name: "task event - new cluster and new task", + initialState: &types.ClusterTaskMap{ + ClusterTaskMap: make(map[string]*types.TaskMap), + }, + eventMap: makeTaskEventMap("taskName123", "nodeid1234", "taskid1", "cluster1", 0), + wantErr: false, + wantClusterCount: 1, + wantTaskInCluster: "cluster1", + wantTaskID: "taskid1", + wantTask: &types.Task{ + TaskID: "taskid1", + Name: "taskName123", + NodeID: "nodeid1234", + AttemptNumber: 0, + }, + }, + { + name: "task event - existing cluster, new task", + initialState: &types.ClusterTaskMap{ + ClusterTaskMap: map[string]*types.TaskMap{ + "cluster1": types.NewTaskMap(), + }, + }, + eventMap: makeTaskEventMap("taskName123", "nodeid1234", "taskid2", "cluster1", 1), + wantErr: false, + wantClusterCount: 1, + wantTaskInCluster: "cluster1", + wantTaskID: "taskid2", + wantTask: &types.Task{ + TaskID: "taskid2", + Name: "taskName123", + NodeID: "nodeid1234", + AttemptNumber: 1, + }, + }, + { + name: "task event - existing cluster and existing task", + initialState: &types.ClusterTaskMap{ + ClusterTaskMap: map[string]*types.TaskMap{ + "cluster1": { + TaskMap: map[string]types.Task{ + "taskid1": initialTask, + }, + }, + }, + }, + eventMap: makeTaskEventMap("taskName123", "nodeid123", "taskid1", "cluster1", 2), + wantErr: false, + wantClusterCount: 1, + wantTaskInCluster: "cluster1", + wantTaskID: "taskid1", + wantTask: &types.Task{ + TaskID: "taskid1", + Name: "taskName123", + NodeID: "nodeid123", + AttemptNumber: 2, + }, + }, + { + name: "task event - missing taskDefinitionEvent", + initialState: &types.ClusterTaskMap{ + ClusterTaskMap: make(map[string]*types.TaskMap), + }, + eventMap: map[string]any{ + "eventType": string(types.TASK_DEFINITION_EVENT), + "clusterName": "c1", + }, + wantErr: true, + }, + { + name: "task event - taskDefinitionEvent wrong type", + initialState: &types.ClusterTaskMap{ + ClusterTaskMap: make(map[string]*types.TaskMap), + }, + eventMap: map[string]any{ + "eventType": string(types.TASK_DEFINITION_EVENT), + "clusterName": "c1", + "taskDefinitionEvent": "not a map", + }, + wantErr: true, // Marshal will fail + }, + { + name: "task event - invalid task structure", + initialState: &types.ClusterTaskMap{ + ClusterTaskMap: make(map[string]*types.TaskMap), + }, + eventMap: map[string]any{ + "eventType": string(types.TASK_DEFINITION_EVENT), + "clusterName": "c1", + "taskDefinitionEvent": map[string]any{ + "taskId": 123, // Should be string + "taskAttempt": 0, + }, + }, + wantErr: true, // Unmarshal will fail + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := &EventHandler{ + ClusterTaskMap: tt.initialState, + } + if h.ClusterTaskMap == nil { + h.ClusterTaskMap = &types.ClusterTaskMap{ + ClusterTaskMap: make(map[string]*types.TaskMap), + } + } + + err := h.storeEvent(tt.eventMap) + + if (err != nil) != tt.wantErr { + t.Fatalf("storeEvent() error = %v, wantErr %v", err, tt.wantErr) + } + if err != nil { + return + } + + gotClusterCount := len(h.ClusterTaskMap.ClusterTaskMap) + + if gotClusterCount != tt.wantClusterCount { + t.Errorf("storeEvent() resulted in %d clusters, want %d", gotClusterCount, tt.wantClusterCount) + } + + if tt.wantTask != nil { + clusterObj, clusterExists := h.ClusterTaskMap.ClusterTaskMap[tt.wantTaskInCluster] + + if !clusterExists { + t.Fatalf("storeEvent() cluster %s not found", tt.wantTaskInCluster) + } + + clusterObj.Lock() + defer clusterObj.Unlock() + gotTask, taskExists := clusterObj.TaskMap[tt.wantTaskID] + if !taskExists { + t.Fatalf("storeEvent() task %s not found in cluster %s", tt.wantTaskID, tt.wantTaskInCluster) + } + + if diff := cmp.Diff(*tt.wantTask, gotTask); diff != "" { + t.Errorf("storeEvent() task mismatch (-want +got):\n%s", diff) + } + } + }) + } +} diff --git a/historyserver/pkg/eventserver/types/actor.go b/historyserver/pkg/eventserver/types/actor.go new file mode 100644 index 00000000000..cc8fec55f6a --- /dev/null +++ b/historyserver/pkg/eventserver/types/actor.go @@ -0,0 +1,67 @@ +package types + +import ( + "sync" + "time" +) + +type StateType string + +const ( + DEPENDENCIES_UNREADY StateType = "DEPENDENCIES_UNREADY" + PENDING_CREATION StateType = "PENDING_CREATION" + ALIVE StateType = "ALIVE" + RESTARTING StateType = "RESTARTING" + DEAD StateType = "DEAD" +) + +type Address struct { + NodeID string + IPAddress string + Port string + WorkerID string +} + +type Actor struct { + ActorID string + JobID string + PlacementGroupID string + State StateType + PID string + Address Address + Name string + NumRestarts string + ActorClass string + StartTime time.Time + EndTime time.Time + RequiredResources map[string]int + ExitDetails string + ReprName string + CallSite string + LabelSelector map[string]string +} + +// ActorMap is a struct that uses ActorID as key and the Actor struct as value +type ActorMap struct { + ActorMap map[string]Actor + Mu sync.Mutex +} + +func (a *ActorMap) Lock() { + a.Mu.Lock() +} + +func (a *ActorMap) Unlock() { + a.Mu.Unlock() +} + +func NewActorMap() *ActorMap { + return &ActorMap{ + ActorMap: make(map[string]Actor), + } +} + +// ClusterActorMap uses the cluster name as the key +type ClusterActorMap struct { + ClusterActorMap map[string]*ActorMap +} diff --git a/historyserver/pkg/eventserver/types/event.go b/historyserver/pkg/eventserver/types/event.go new file mode 100644 index 00000000000..257ed3373cf --- /dev/null +++ b/historyserver/pkg/eventserver/types/event.go @@ -0,0 +1,17 @@ +package types + +type EventType string + +const ( + EVENT_TYPE_UNSPECIFIED EventType = "EVENT_TYPE_UNSPECIFIED" + TASK_DEFINITION_EVENT EventType = "TASK_DEFINITION_EVENT" + TASK_LIFECYCLE_EVENT EventType = "TASK_LIFECYCLE_EVENT" + ACTOR_TASK_DEFINITION_EVENT EventType = "ACTOR_TASK_DEFINITION_EVENT" + TASK_PROFILE_EVENT EventType = "TASK_PROFILE_EVENT" + DRIVER_JOB_DEFINITION_EVENT EventType = "DRIVER_JOB_DEFINITION_EVENT" + DRIVER_JOB_LIFECYCLE_EVENT EventType = "DRIVER_JOB_LIFECYCLE_EVENT" + NODE_DEFINITION_EVENT EventType = "NODE_DEFINITION_EVENT" + NODE_LIFECYCLE_EVENT EventType = "NODE_LIFECYCLE_EVENT" + ACTOR_DEFINITION_EVENT EventType = "ACTOR_DEFINITION_EVENT" + ACTOR_LIFECYCLE_EVENT EventType = "ACTOR_LIFECYCLE_EVENT" +) diff --git a/historyserver/pkg/eventserver/types/task.go b/historyserver/pkg/eventserver/types/task.go new file mode 100644 index 00000000000..dfad44834e1 --- /dev/null +++ b/historyserver/pkg/eventserver/types/task.go @@ -0,0 +1,82 @@ +package types + +import ( + "sync" + "time" +) + +type TaskStatus string + +const ( + NIL TaskStatus = "NIL" + PENDING_ARGS_AVAIL TaskStatus = "PENDING_ARGS_AVAIL" + PENDING_NODE_ASSIGNMENT TaskStatus = "PENDING_NODE_ASSIGNMENT" + PENDING_OBJ_STORE_MEM_AVAIL TaskStatus = "PENDING_OBJ_STORE_MEM_AVAIL" + PENDING_ARGS_FETCH TaskStatus = "PENDING_ARGS_FETCH" + SUBMITTED_TO_WORKER TaskStatus = "SUBMITTED_TO_WORKER" + PENDING_ACTOR_TASK_ARGS_FETCH TaskStatus = "PENDING_ACTOR_TASK_ARGS_FETCH" + PENDING_ACTOR_TASK_ORDERING_OR_CONCURRENCY TaskStatus = "PENDING_ACTOR_TASK_ORDERING_OR_CONCURRENCY" + RUNNING TaskStatus = "RUNNING" + RUNNING_IN_RAY_GET TaskStatus = "RUNNING_IN_RAY_GET" + RUNNING_IN_RAY_WAIT TaskStatus = "RUNNING_IN_RAY_WAIT" + FINISHED TaskStatus = "FINISHED" + FAILED TaskStatus = "FAILED" +) + +type TaskType string + +const ( + NORMAL_TASK TaskType = "NORMAL_TASK" + ACTOR_CREATION_TASK TaskType = "ACTOR_CREATION_TASK" + ACTOR_TASK TaskType = "ACTOR_TASK" + DRIVER_TASK TaskType = "DRIVER_TASK" +) + +type Task struct { + TaskID string `json:"taskId"` + Name string `json:"taskName"` + AttemptNumber int `json:"taskAttempt"` + State TaskStatus + JobID string `json:"jobId"` + NodeID string `json:"nodeId"` + ActorID string + PlacementGroupID string `json:"placementGroupId"` + Type TaskType `json:"taskType"` + FuncOrClassName string `json:"functionName"` + Language string `json:"language"` + RequiredResources map[string]int `json:"requiredResources"` + StartTime time.Time + EndTime time.Time + // ProfilingData ProfilingData + WorkerID string `json:"workerId"` + ErrorType string `json:"errorType"` + ErrorMessage string `json:"errorMessage"` + TaskLogInfo map[string]string + CallSite string + LabelSelector map[string]string +} + +// TaskMap is a struct that uses TaskID as key and the Task struct as value +type TaskMap struct { + TaskMap map[string]Task + Mu sync.Mutex +} + +func (t *TaskMap) Lock() { + t.Mu.Lock() +} + +func (t *TaskMap) Unlock() { + t.Mu.Unlock() +} + +func NewTaskMap() *TaskMap { + return &TaskMap{ + TaskMap: make(map[string]Task), + } +} + +// ClusterTaskMap uses the cluster name as the key +type ClusterTaskMap struct { + ClusterTaskMap map[string]*TaskMap +} diff --git a/historyserver/pkg/storage/storage_interface.go b/historyserver/pkg/storage/storage_interface.go new file mode 100644 index 00000000000..9348a562440 --- /dev/null +++ b/historyserver/pkg/storage/storage_interface.go @@ -0,0 +1,26 @@ +package storage + +import ( + "io" + + "github.com/ray-project/kuberay/historyserver/pkg/utils" +) + +// StroageWriter is the interface for writing to storage +type StorageWriter interface { + // CreateDirectory should create directory using given path + CreateDirectory(path string) error + // WriteFile should write everything from the reader + WriteFile(file string, reader io.ReadSeeker) error +} + +// StroageReader is the interface fr reading from storage +type StorageReader interface { + // List returns a list of all available cluster and information + ListClusters() []utils.ClusterInfo + // GetContent will return a reader given the clusterID and the file to read + GetContent(clusterId string, file string) io.Reader + // ListFiles will return a list of files of current directory given the cluster and the directory + // S3, minio, and GCS are all flat object storages, this assumes that file names are also "paths" + ListFiles(clusterId, directory string) []string +} diff --git a/historyserver/pkg/utils/types.go b/historyserver/pkg/utils/types.go new file mode 100644 index 00000000000..861bedb7a06 --- /dev/null +++ b/historyserver/pkg/utils/types.go @@ -0,0 +1,28 @@ +package utils + +/* +Copyright 2024 by the zhangjie bingyu.zj@alibaba-inc.com Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +type ClusterInfo struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + SessionName string `json:"sessionName"` + CreateTime string `json:"createTime"` + CreateTimeStamp int64 `json:"createTimeStamp"` +} + +type ClusterInfoList []ClusterInfo + +func (a ClusterInfoList) Len() int { return len(a) } +func (a ClusterInfoList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreateTimeStamp > a[j].CreateTimeStamp } // Will sort decending