From 4363168afcbbbaff3e2415b1f8c42d3a10de400b Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Mon, 29 Dec 2025 13:21:18 +0000 Subject: [PATCH 01/17] feat(historyserver):re-push prev-logs on pod restart Signed-off-by: my-vegetable-has-exploded --- .../runtime/logcollector/collector.go | 75 +++++++++++++++++-- 1 file changed, 68 insertions(+), 7 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index a2d4f09bfcb..180030abc98 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -62,6 +62,11 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error { // Setup signal handling for SIGTERM sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) + + // On startup, scan and process all existing prev-logs + // This ensures that any incomplete uploads from previous runs are resumed + go r.scanAndProcessExistingPrevLogs() + go r.WatchPrevLogsLoops() if r.EnableMeta { go r.WatchSessionLatestLoops() // Watch session_latest symlink changes @@ -354,6 +359,43 @@ func (r *RayLogHandler) WatchLogsLoops(watcher *fsnotify.Watcher, walkPath strin } } +// scanAndProcessExistingPrevLogs scans the prev-logs directory on startup +// and processes any existing sessions/nodes that may have been left from previous runs. +// This ensures resumption of incomplete uploads after collector restart. +func (r *RayLogHandler) scanAndProcessExistingPrevLogs() { + watchPath := r.prevLogsDir + + logrus.Infof("Starting initial scan of prev-logs directory: %s", watchPath) + + // Check if prev-logs directory exists + if _, err := os.Stat(watchPath); os.IsNotExist(err) { + logrus.Infof("prev-logs directory does not exist on startup, nothing to process") + return + } + + // Read all session directories (first level only) + sessionEntries, err := os.ReadDir(watchPath) + if err != nil { + logrus.Errorf("Failed to read prev-logs directory %s: %v", watchPath, err) + return + } + + for _, sessionEntry := range sessionEntries { + if !sessionEntry.IsDir() { + continue + } + + sessionID := sessionEntry.Name() + sessionPath := filepath.Join(watchPath, sessionID) + logrus.Infof("Found existing session directory on startup: %s", sessionID) + + // Process all node directories under this session + r.processSessionPrevLogs(sessionPath) + } + + logrus.Infof("Completed initial scan of prev-logs directory") +} + func (r *RayLogHandler) WatchPrevLogsLoops() { watchPath := r.prevLogsDir @@ -632,6 +674,26 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) { } } +// isFileAlreadyPersisted checks if a file has already been persisted to persist-complete-logs +func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool { + // Calculate the relative path within the logs directory + logsDir := filepath.Join("/tmp/ray/prev-logs", sessionID, nodeID, "logs") + relativePath, err := filepath.Rel(logsDir, absoluteLogPath) + if err != nil { + logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err) + return false + } + + // Construct the path in persist-complete-logs + persistedPath := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs", relativePath) + + // Check if the file exists + if _, err := os.Stat(persistedPath); err == nil { + return true + } + return false +} + // processPrevLogsDir processes logs in a /tmp/ray/prev-logs/{sessionid}/{nodeid} directory func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { // Extract session ID and node ID from the path @@ -653,13 +715,6 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { return } - // Check if this directory has already been processed by checking in persist-complete-logs - completeDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs") - if _, err := os.Stat(completeDir); err == nil { - logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID) - return - } - logrus.Infof("Processing prev-logs for session: %s, node: %s", sessionID, nodeID) logsDir := filepath.Join(sessionNodeDir, "logs") @@ -690,6 +745,12 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { return nil } + // Check if this file has already been persisted + if r.isFileAlreadyPersisted(path, sessionID, nodeID) { + logrus.Debugf("File %s already persisted, skipping", path) + return nil + } + // Process log file if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil { logrus.Errorf("Failed to process prev-log file %s: %v", path, err) From 9476fbb6fadd4f0bb1ff666a4c596e666b9a53e1 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Tue, 30 Dec 2025 02:53:01 +0000 Subject: [PATCH 02/17] chroe(historyserver): replace hard code path Signed-off-by: my-vegetable-has-exploded --- .../logcollector/runtime/logcollector/collector.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index 180030abc98..fa12a68e164 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -35,6 +35,7 @@ type RayLogHandler struct { RootDir string SessionDir string prevLogsDir string + persistCompleteLogsDir string PushInterval time.Duration LogBatching int filePathMu sync.Mutex @@ -49,6 +50,7 @@ func (r *RayLogHandler) Start(stop <-chan struct{}) error { func (r *RayLogHandler) Run(stop <-chan struct{}) error { // watchPath := r.LogDir r.prevLogsDir = "/tmp/ray/prev-logs" + r.persistCompleteLogsDir = "/tmp/ray/persist-complete-logs" // Initialize log file paths storage r.logFilePaths = make(map[string]bool) @@ -413,7 +415,7 @@ func (r *RayLogHandler) WatchPrevLogsLoops() { } // Also check and create persist-complete-logs directory - completeLogsDir := "/tmp/ray/persist-complete-logs" + completeLogsDir := r.persistCompleteLogsDir if _, err := os.Stat(completeLogsDir); os.IsNotExist(err) { logrus.Infof("persist-complete-logs directory does not exist, creating it: %s", completeLogsDir) if err := os.MkdirAll(completeLogsDir, 0o777); err != nil { @@ -677,7 +679,7 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) { // isFileAlreadyPersisted checks if a file has already been persisted to persist-complete-logs func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool { // Calculate the relative path within the logs directory - logsDir := filepath.Join("/tmp/ray/prev-logs", sessionID, nodeID, "logs") + logsDir := filepath.Join(r.prevLogsDir, sessionID, nodeID, "logs") relativePath, err := filepath.Rel(logsDir, absoluteLogPath) if err != nil { logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err) @@ -685,7 +687,7 @@ func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeI } // Construct the path in persist-complete-logs - persistedPath := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs", relativePath) + persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativePath) // Check if the file exists if _, err := os.Stat(persistedPath); err == nil { @@ -817,7 +819,7 @@ func (r *RayLogHandler) processPrevLogFile(absoluteLogPathName, localLogDir, ses logrus.Infof("Successfully wrote object %s, size: %d bytes", objectName, len(content)) // Move the processed file to persist-complete-logs directory to avoid re-uploading - completeBaseDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID) + completeBaseDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID) completeDir := filepath.Join(completeBaseDir, "logs") if _, err := os.Stat(completeDir); os.IsNotExist(err) { From 9d8456abe1e1130df16adac14a0f4d5d1153080d Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Tue, 30 Dec 2025 03:00:43 +0000 Subject: [PATCH 03/17] fmt. Signed-off-by: my-vegetable-has-exploded --- .../runtime/logcollector/collector.go | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index fa12a68e164..fbc6e1e3ac6 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -22,24 +22,24 @@ import ( ) type RayLogHandler struct { - Writer storage.StorageWriter - LogFiles chan string - HttpClient *http.Client - ShutdownChan chan struct{} - logFilePaths map[string]bool - MetaDir string - RayClusterName string - LogDir string - RayNodeName string - RayClusterID string - RootDir string - SessionDir string - prevLogsDir string + Writer storage.StorageWriter + LogFiles chan string + HttpClient *http.Client + ShutdownChan chan struct{} + logFilePaths map[string]bool + MetaDir string + RayClusterName string + LogDir string + RayNodeName string + RayClusterID string + RootDir string + SessionDir string + prevLogsDir string persistCompleteLogsDir string - PushInterval time.Duration - LogBatching int - filePathMu sync.Mutex - EnableMeta bool + PushInterval time.Duration + LogBatching int + filePathMu sync.Mutex + EnableMeta bool } func (r *RayLogHandler) Start(stop <-chan struct{}) error { From 1404f14afaf3d7ab7d4e2a699f72a9883d185616 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Tue, 30 Dec 2025 07:04:52 +0000 Subject: [PATCH 04/17] test(historyserver): add test for logcollector restart Signed-off-by: my-vegetable-has-exploded --- .../runtime/logcollector/collector_test.go | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go new file mode 100644 index 00000000000..c8235705232 --- /dev/null +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go @@ -0,0 +1,174 @@ +package logcollector + +import ( + "io" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +// MockStorageWriter is a mock implementation of storage.StorageWriter for testing +type MockStorageWriter struct { + mu sync.Mutex + createdDirs []string + writtenFiles map[string]string // path -> content +} + +func NewMockStorageWriter() *MockStorageWriter { + return &MockStorageWriter{ + createdDirs: make([]string, 0), + writtenFiles: make(map[string]string), + } +} + +func (m *MockStorageWriter) CreateDirectory(path string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.createdDirs = append(m.createdDirs, path) + return nil +} + +func (m *MockStorageWriter) WriteFile(file string, reader io.ReadSeeker) error { + content, err := io.ReadAll(reader) + if err != nil { + return err + } + m.mu.Lock() + defer m.mu.Unlock() + m.writtenFiles[file] = string(content) + return nil +} + +// setupRayTestEnvironment creates test directories under /tmp/ray for realistic testing +// This matches the actual paths used by the logcollector +func setupRayTestEnvironment(t *testing.T) (string, func()) { + baseDir := filepath.Join("/tmp", "ray-test-"+t.Name()) + + // Create base directory + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("Failed to create base dir: %v", err) + } + + // Create prev-logs and persist-complete-logs directories + prevLogsDir := filepath.Join(baseDir, "prev-logs") + persistLogsDir := filepath.Join(baseDir, "persist-complete-logs") + + if err := os.MkdirAll(prevLogsDir, 0755); err != nil { + t.Fatalf("Failed to create prev-logs dir: %v", err) + } + if err := os.MkdirAll(persistLogsDir, 0755); err != nil { + t.Fatalf("Failed to create persist-complete-logs dir: %v", err) + } + + cleanup := func() { + os.RemoveAll(baseDir) + } + + return baseDir, cleanup +} + +// createTestLogFile creates a test log file with given content +func createTestLogFile(t *testing.T, path string, content string) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatalf("Failed to create directory %s: %v", dir, err) + } + + if err := os.WriteFile(path, []byte(content), 0644); err != nil { + t.Fatalf("Failed to write file %s: %v", path, err) + } +} + +// TestIsFileAlreadyPersisted tests the file-level persistence check +func TestIsFileAlreadyPersisted(t *testing.T) { + baseDir, cleanup := setupRayTestEnvironment(t) + defer cleanup() + + // Use the actual prev-logs directory structure that matches production + handler := &RayLogHandler{ + prevLogsDir: filepath.Join(baseDir, "prev-logs"), + persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"), + } + + sessionID := "session-123" + nodeID := "node-456" + + // Create prev-logs structure + prevLogsPath := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs", "worker.log") + createTestLogFile(t, prevLogsPath, "test log content") + + // Test case 1: File not yet persisted + if handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) { + t.Error("Expected file to not be persisted yet") + } + + // Create the persisted file in persist-complete-logs + persistedPath := filepath.Join(baseDir, "persist-complete-logs", sessionID, nodeID, "logs", "worker.log") + createTestLogFile(t, persistedPath, "test log content") + + // Test case 2: File already persisted + if !handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) { + t.Error("Expected file to be detected as persisted") + } +} + +// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan +func TestScanAndProcess(t *testing.T) { + baseDir, cleanup := setupRayTestEnvironment(t) + defer cleanup() + + mockWriter := NewMockStorageWriter() + handler := &RayLogHandler{ + Writer: mockWriter, + RootDir: "/test-root", + prevLogsDir: filepath.Join(baseDir, "prev-logs"), + persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"), + ShutdownChan: make(chan struct{}), + RayClusterName: "test-cluster", + RayClusterID: "cluster-123", + } + + sessionID := "session-lifecycle" + nodeID := "node-1" + logsDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs") + + // Prepare two files + f1 := filepath.Join(logsDir, "file1.log") + f2 := filepath.Join(logsDir, "file2.log") + createTestLogFile(t, f1, "content1") + createTestLogFile(t, f2, "content2") + + // --- Step 1: Process file1 only (simulating partial success) --- + err := handler.processPrevLogFile(f1, logsDir, sessionID, nodeID) + if err != nil { + t.Fatalf("Failed to process file1: %v", err) + } + + // Verify file1 is in storage + if len(mockWriter.writtenFiles) != 1 { + t.Errorf("Expected 1 file in storage, got %d", len(mockWriter.writtenFiles)) + } + + // Manually restore file1 to prev-logs to simulate a crash right after upload but before/during rename + createTestLogFile(t, f1, "content1") + + // --- Step 2: Run startup scan --- + handler.scanAndProcessExistingPrevLogs() + + // Wait for async processing + time.Sleep(200 * time.Millisecond) + + // --- Step 3: Final Verification --- + // 1. Storage should have 2 unique files (file1 should NOT be re-uploaded) + if len(mockWriter.writtenFiles) != 2 { + t.Errorf("Expected 2 unique files in storage, got %d", len(mockWriter.writtenFiles)) + } + + // 2. The node directory in prev-logs should be removed now that all files are processed + sessionNodeDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID) + if _, err := os.Stat(sessionNodeDir); !os.IsNotExist(err) { + t.Error("Node directory should be removed after all files are processed and moved") + } +} From 0467368298b5e233c004f6f7129d2df9e8685892 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Wed, 7 Jan 2026 15:07:58 +0000 Subject: [PATCH 05/17] add e2e test for repush Signed-off-by: my-vegetable-has-exploded --- historyserver/test/e2e/collector_test.go | 69 ++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 3fe16341405..61281618c68 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -53,6 +53,10 @@ func TestCollector(t *testing.T) { name: "Simulate OOMKilled behavior: Single session single node logs and events should be uploaded to S3 after the ray-head container is restarted", testFunc: testCollectorSeparatesFilesBySession, }, + { + name: "Collector restart: should scan prev-logs and resume uploads left by a crash", + testFunc: testCollectorResumesUploadsOnRestart, + }, } for _, tt := range tests { @@ -473,3 +477,68 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co } return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) } + +// testCollectorResumesUploadsOnRestart verifies that the Collector scans and resumes uploads from the prev-logs directory on startup. +func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { + rayCluster := prepareTestEnv(test, g, namespace, s3Client) + + // Directory variables for easier maintenance + prevLogsBaseDir := "/tmp/ray/prev-logs" + persistCompleteBaseDir := "/tmp/ray/persist-complete-logs" + + dummySessionID := "test-recovery-session" + dummyNodeID := "head-node" + clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) + sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID) + + // 1. Kill the collector container to trigger a restart. + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Killing collector container to test startup scanning of prev-logs") + _, _ = ExecPodCmd(test, headPod, "collector", []string{"kill", "1"}) + + // 2. Inject "leftover" logs and events into prev-logs via the ray-head container while collector is down. + LogWithTimestamp(test.T(), "Injecting logs and events into %s while collector is down", prevLogsBaseDir) + sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) + injectCmd := fmt.Sprintf( + "mkdir -p %s/logs && echo 'dummy log' > %s/logs/test.log && mkdir -p %s/node_events && echo '{\"event\":\"test\"}' > %s/node_events/test.json", + sessionDir, sessionDir, sessionDir, sessionDir, + ) + _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) + g.Expect(stderr.String()).To(BeEmpty()) + + // 3. Wait for collector container to restart and become ready. + LogWithTimestamp(test.T(), "Waiting for collector container to restart and become ready") + g.Eventually(func(gg Gomega) { + updatedPod, err := GetHeadPod(test, rayCluster) + gg.Expect(err).NotTo(HaveOccurred()) + cs, err := getContainerStatusByName(updatedPod, "collector") + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(cs.RestartCount).To(BeNumerically(">", 0)) + gg.Expect(cs.Ready).To(BeTrue()) + }, TestTimeoutMedium).Should(Succeed()) + + // 4. Verify S3 uploads using the existing verifyS3SessionDirs helper. + LogWithTimestamp(test.T(), "Verifying scanning logic: checking S3 for recovered files") + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, dummyNodeID) + + // 5. Verify local state: the session should be moved from prev-logs to persist-complete-logs. + LogWithTimestamp(test.T(), "Verifying local state: session should be moved to %s", persistCompleteBaseDir) + g.Eventually(func(gg Gomega) { + currentHeadPod, err := GetHeadPod(test, rayCluster) + gg.Expect(err).NotTo(HaveOccurred()) + // Check that the session directory exists in persist-complete-logs + persistPath := filepath.Join(persistCompleteBaseDir, dummySessionID) + checkCmd := fmt.Sprintf("test -d %s && echo 'exists'", persistPath) + stdout, _ := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkCmd}) + gg.Expect(strings.TrimSpace(stdout.String())).To(Equal("exists"), "Session directory should be in persist-complete-logs") + + // Check that the session directory is gone from prev-logs + prevPath := filepath.Join(prevLogsBaseDir, dummySessionID) + checkGoneCmd := fmt.Sprintf("test ! -d %s && echo 'gone'", prevPath) + stdoutGone, _ := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkGoneCmd}) + gg.Expect(strings.TrimSpace(stdoutGone.String())).To(Equal("gone"), "Session directory should be cleaned from prev-logs") + }, TestTimeoutMedium).Should(Succeed()) + + deleteS3Bucket(test, g, s3Client) +} From 7d45c038ef33a9c89bf6d38727ceee721706f559 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Wed, 7 Jan 2026 16:27:40 +0000 Subject: [PATCH 06/17] fix e2e test. Signed-off-by: my-vegetable-has-exploded --- historyserver/test/e2e/collector_test.go | 49 ++++++++++++++++-------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 61281618c68..709abbbe812 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -64,6 +64,14 @@ func TestCollector(t *testing.T) { test := With(t) g := NewWithT(t) namespace := test.NewTestNamespace() + test.T().Cleanup(func() { + err := test.Client().Core().CoreV1().Namespaces().Delete(test.Ctx(), namespace.Name, metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + test.T().Logf("Failed to delete namespace %s: %v", namespace.Name, err) + } else { + LogWithTimestamp(test.T(), "Deleted test namespace %s successfully", namespace.Name) + } + }) tt.testFunc(test, g, namespace, s3Client) }) @@ -108,7 +116,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Verify logs and node_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID, false) // Delete S3 bucket to ensure test isolation. deleteS3Bucket(test, g, s3Client) @@ -183,7 +191,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 } // Verify logs and node_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID, false) deleteS3Bucket(test, g, s3Client) } @@ -378,8 +386,12 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // Additionally, it verifies that specific files have content: // - logs//raylet.out must exist and have content > 0 bytes // - node_events/_ must exist and have content > 0 bytes (suffix can be ignored for verification) -func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { - dirs := []string{"logs", "node_events"} +// If skipNodeEvents is true, node_events directory verification will be skipped. +func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string, skipNodeEvents bool) { + dirs := []string{"logs"} + if !skipNodeEvents { + dirs = append(dirs, "node_events") + } for _, dir := range dirs { dirPrefix := sessionPrefix + dir + "/" @@ -497,12 +509,14 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 LogWithTimestamp(test.T(), "Killing collector container to test startup scanning of prev-logs") _, _ = ExecPodCmd(test, headPod, "collector", []string{"kill", "1"}) - // 2. Inject "leftover" logs and events into prev-logs via the ray-head container while collector is down. - LogWithTimestamp(test.T(), "Injecting logs and events into %s while collector is down", prevLogsBaseDir) + // 2. Inject "leftover" logs into prev-logs via the ray-head container while collector is down. + // Note: We only inject logs, not node_events, because the collector's prev-logs processing + // currently only handles the logs directory. node_events are handled by the EventServer separately. + LogWithTimestamp(test.T(), "Injecting logs into %s while collector is down", prevLogsBaseDir) sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) injectCmd := fmt.Sprintf( - "mkdir -p %s/logs && echo 'dummy log' > %s/logs/test.log && mkdir -p %s/node_events && echo '{\"event\":\"test\"}' > %s/node_events/test.json", - sessionDir, sessionDir, sessionDir, sessionDir, + "mkdir -p %s/logs && echo 'dummy log' > %s/logs/test.log", + sessionDir, sessionDir, ) _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) g.Expect(stderr.String()).To(BeEmpty()) @@ -519,25 +533,26 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 }, TestTimeoutMedium).Should(Succeed()) // 4. Verify S3 uploads using the existing verifyS3SessionDirs helper. + // Skip node_events verification since prev-logs processing only handles logs directory. LogWithTimestamp(test.T(), "Verifying scanning logic: checking S3 for recovered files") - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, dummyNodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, dummyNodeID, true) - // 5. Verify local state: the session should be moved from prev-logs to persist-complete-logs. - LogWithTimestamp(test.T(), "Verifying local state: session should be moved to %s", persistCompleteBaseDir) + // 5. Verify local state: the node directory should be moved from prev-logs to persist-complete-logs. + LogWithTimestamp(test.T(), "Verifying local state: node directory should be moved to %s", persistCompleteBaseDir) g.Eventually(func(gg Gomega) { currentHeadPod, err := GetHeadPod(test, rayCluster) gg.Expect(err).NotTo(HaveOccurred()) - // Check that the session directory exists in persist-complete-logs - persistPath := filepath.Join(persistCompleteBaseDir, dummySessionID) + // Check that the node directory exists in persist-complete-logs + persistPath := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) checkCmd := fmt.Sprintf("test -d %s && echo 'exists'", persistPath) stdout, _ := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkCmd}) - gg.Expect(strings.TrimSpace(stdout.String())).To(Equal("exists"), "Session directory should be in persist-complete-logs") + gg.Expect(strings.TrimSpace(stdout.String())).To(Equal("exists"), "Node directory should be in persist-complete-logs") - // Check that the session directory is gone from prev-logs - prevPath := filepath.Join(prevLogsBaseDir, dummySessionID) + // Check that the node directory is gone from prev-logs + prevPath := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) checkGoneCmd := fmt.Sprintf("test ! -d %s && echo 'gone'", prevPath) stdoutGone, _ := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkGoneCmd}) - gg.Expect(strings.TrimSpace(stdoutGone.String())).To(Equal("gone"), "Session directory should be cleaned from prev-logs") + gg.Expect(strings.TrimSpace(stdoutGone.String())).To(Equal("gone"), "Node directory should be cleaned from prev-logs") }, TestTimeoutMedium).Should(Succeed()) deleteS3Bucket(test, g, s3Client) From 1f59c9910bc6d830e829d47fb177c7497b3f4541 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Wed, 7 Jan 2026 16:32:17 +0000 Subject: [PATCH 07/17] add Troubleshooting. Signed-off-by: my-vegetable-has-exploded --- historyserver/docs/set_up_collector.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/historyserver/docs/set_up_collector.md b/historyserver/docs/set_up_collector.md index d17b62c3065..e7de505412e 100644 --- a/historyserver/docs/set_up_collector.md +++ b/historyserver/docs/set_up_collector.md @@ -141,3 +141,25 @@ kubectl delete -f historyserver/config/raycluster.yaml You're supposed to see the uploaded logs and events in the minio UI as below: ![write_logs_and_events](https://github.com/ray-project/kuberay/blob/db7cb864061518ed4cfa7bf48cf05cfbfeb49f95/historyserver/docs/assets/write_logs_and_events.png) + +## Troubleshooting + +### "too many open files" error +If you encounter `level=fatal msg="Create fsnotify NewWatcher error too many open files"` in the collector logs, it is likely due to the inotify limits on the Kubernetes nodes. + +To fix this, increase the limits on the **host nodes** (not inside the container): + +```bash +# Apply changes immediately +sudo sysctl -w fs.inotify.max_user_instances=8192 +sudo sysctl -w fs.inotify.max_user_watches=524288 +``` + +To make these changes persistent across reboots, use the following lines: + +```text +echo "fs.inotify.max_user_instances=8192" | sudo tee -a /etc/sysctl.conf +echo "fs.inotify.max_user_watches=524288" | sudo tee -a /etc/sysctl.conf +sudo sysctl -p +``` + From a04b9d7e6df5b1576d0a36de9973080f347ff7fb Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Thu, 8 Jan 2026 02:07:47 +0000 Subject: [PATCH 08/17] rm redundant cleanup. Signed-off-by: my-vegetable-has-exploded --- historyserver/test/e2e/collector_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 709abbbe812..192e292a193 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -64,14 +64,6 @@ func TestCollector(t *testing.T) { test := With(t) g := NewWithT(t) namespace := test.NewTestNamespace() - test.T().Cleanup(func() { - err := test.Client().Core().CoreV1().Namespaces().Delete(test.Ctx(), namespace.Name, metav1.DeleteOptions{}) - if err != nil && !k8serrors.IsNotFound(err) { - test.T().Logf("Failed to delete namespace %s: %v", namespace.Name, err) - } else { - LogWithTimestamp(test.T(), "Deleted test namespace %s successfully", namespace.Name) - } - }) tt.testFunc(test, g, namespace, s3Client) }) From 38812d7516c4b7909138224b28042556221fa508 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Fri, 9 Jan 2026 02:38:49 +0000 Subject: [PATCH 09/17] reuse WatchPrevLogsLoops to scan existing logs. Signed-off-by: my-vegetable-has-exploded --- .../runtime/logcollector/collector.go | 43 +------------------ .../runtime/logcollector/collector_test.go | 2 +- 2 files changed, 3 insertions(+), 42 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index 296640187a6..a12a1b2c22d 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -65,10 +65,8 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) - // On startup, scan and process all existing prev-logs - // This ensures that any incomplete uploads from previous runs are resumed - go r.scanAndProcessExistingPrevLogs() - + // WatchPrevLogsLoops will scan and process all existing prev-logs on startup, + // then watch for new files. This ensures incomplete uploads from previous runs are resumed. go r.WatchPrevLogsLoops() if r.EnableMeta { go r.WatchSessionLatestLoops() // Watch session_latest symlink changes @@ -300,43 +298,6 @@ func (r *RayLogHandler) WatchLogsLoops(watcher *fsnotify.Watcher, walkPath strin } } -// scanAndProcessExistingPrevLogs scans the prev-logs directory on startup -// and processes any existing sessions/nodes that may have been left from previous runs. -// This ensures resumption of incomplete uploads after collector restart. -func (r *RayLogHandler) scanAndProcessExistingPrevLogs() { - watchPath := r.prevLogsDir - - logrus.Infof("Starting initial scan of prev-logs directory: %s", watchPath) - - // Check if prev-logs directory exists - if _, err := os.Stat(watchPath); os.IsNotExist(err) { - logrus.Infof("prev-logs directory does not exist on startup, nothing to process") - return - } - - // Read all session directories (first level only) - sessionEntries, err := os.ReadDir(watchPath) - if err != nil { - logrus.Errorf("Failed to read prev-logs directory %s: %v", watchPath, err) - return - } - - for _, sessionEntry := range sessionEntries { - if !sessionEntry.IsDir() { - continue - } - - sessionID := sessionEntry.Name() - sessionPath := filepath.Join(watchPath, sessionID) - logrus.Infof("Found existing session directory on startup: %s", sessionID) - - // Process all node directories under this session - r.processSessionPrevLogs(sessionPath) - } - - logrus.Infof("Completed initial scan of prev-logs directory") -} - func (r *RayLogHandler) WatchPrevLogsLoops() { watchPath := r.prevLogsDir diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go index c8235705232..cec8c439476 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go @@ -155,7 +155,7 @@ func TestScanAndProcess(t *testing.T) { createTestLogFile(t, f1, "content1") // --- Step 2: Run startup scan --- - handler.scanAndProcessExistingPrevLogs() + handler.WatchPrevLogsLoops() // Wait for async processing time.Sleep(200 * time.Millisecond) From 56ff9b469a846bf6653fea7fe377a6ff898f3ed1 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Fri, 9 Jan 2026 07:18:50 +0000 Subject: [PATCH 10/17] simulate partial upload in e2e test. Signed-off-by: my-vegetable-has-exploded --- historyserver/test/e2e/collector_test.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 192e292a193..b2fe9210f49 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -490,8 +490,9 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 prevLogsBaseDir := "/tmp/ray/prev-logs" persistCompleteBaseDir := "/tmp/ray/persist-complete-logs" - dummySessionID := "test-recovery-session" - dummyNodeID := "head-node" + // Use namespace name to ensure test isolation (avoid conflicts from previous test runs) + dummySessionID := fmt.Sprintf("test-recovery-session-%s", namespace.Name) + dummyNodeID := fmt.Sprintf("head-node-%s", namespace.Name) clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID) @@ -506,9 +507,20 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 // currently only handles the logs directory. node_events are handled by the EventServer separately. LogWithTimestamp(test.T(), "Injecting logs into %s while collector is down", prevLogsBaseDir) sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) + persistDir := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) + // Inject 2 files and simulate partial processing: move file1 to persist-complete-logs injectCmd := fmt.Sprintf( - "mkdir -p %s/logs && echo 'dummy log' > %s/logs/test.log", - sessionDir, sessionDir, + "mkdir -p %s/logs && "+ + "echo 'file1 content' > %s/logs/file1.log && "+ + "echo 'file2 content' > %s/logs/file2.log && "+ + "mkdir -p %s/logs && "+ + "mv %s/logs/file1.log %s/logs/file1.log", + sessionDir, + sessionDir, + sessionDir, + persistDir, + sessionDir, + persistDir, ) _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) g.Expect(stderr.String()).To(BeEmpty()) From 75566a2af31a08d7b180857c1659aed374055610 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Fri, 9 Jan 2026 07:37:43 +0000 Subject: [PATCH 11/17] fix unit test. Signed-off-by: my-vegetable-has-exploded --- .../runtime/logcollector/collector_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go index cec8c439476..5f61995fb39 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go @@ -154,11 +154,11 @@ func TestScanAndProcess(t *testing.T) { // Manually restore file1 to prev-logs to simulate a crash right after upload but before/during rename createTestLogFile(t, f1, "content1") - // --- Step 2: Run startup scan --- - handler.WatchPrevLogsLoops() + // --- Step 2: Run startup scan in background --- + go handler.WatchPrevLogsLoops() - // Wait for async processing - time.Sleep(200 * time.Millisecond) + // Wait for async processing - give more time for file2.log to be processed + time.Sleep(1000 * time.Millisecond) // --- Step 3: Final Verification --- // 1. Storage should have 2 unique files (file1 should NOT be re-uploaded) @@ -171,4 +171,8 @@ func TestScanAndProcess(t *testing.T) { if _, err := os.Stat(sessionNodeDir); !os.IsNotExist(err) { t.Error("Node directory should be removed after all files are processed and moved") } + + // Ensure background goroutine exits + close(handler.ShutdownChan) + time.Sleep(100 * time.Millisecond) } From 9221529ff6dde967b791300ca1fb99657b6a0c54 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Fri, 9 Jan 2026 08:11:24 +0000 Subject: [PATCH 12/17] fix lint Signed-off-by: my-vegetable-has-exploded --- historyserver/docs/set_up_collector.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/historyserver/docs/set_up_collector.md b/historyserver/docs/set_up_collector.md index e7de505412e..a59a5588431 100644 --- a/historyserver/docs/set_up_collector.md +++ b/historyserver/docs/set_up_collector.md @@ -145,7 +145,9 @@ You're supposed to see the uploaded logs and events in the minio UI as below: ## Troubleshooting ### "too many open files" error -If you encounter `level=fatal msg="Create fsnotify NewWatcher error too many open files"` in the collector logs, it is likely due to the inotify limits on the Kubernetes nodes. + +If you encounter `level=fatal msg="Create fsnotify NewWatcher error too many open files"` in the collector logs, +it is likely due to the inotify limits on the Kubernetes nodes. To fix this, increase the limits on the **host nodes** (not inside the container): @@ -162,4 +164,3 @@ echo "fs.inotify.max_user_instances=8192" | sudo tee -a /etc/sysctl.conf echo "fs.inotify.max_user_watches=524288" | sudo tee -a /etc/sysctl.conf sudo sysctl -p ``` - From 06e0dddd33af4f6c1bee13754087a89e4bf673e6 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Fri, 9 Jan 2026 09:08:06 +0000 Subject: [PATCH 13/17] fix mv race condition in e2e test. Signed-off-by: my-vegetable-has-exploded --- historyserver/test/e2e/collector_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index b2fe9210f49..9a05cd6a1a7 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -508,19 +508,18 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 LogWithTimestamp(test.T(), "Injecting logs into %s while collector is down", prevLogsBaseDir) sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) persistDir := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) - // Inject 2 files and simulate partial processing: move file1 to persist-complete-logs + // Inject 2 files to simulate partial processing: + // - file1.log in persist-complete-logs (already processed and uploaded) + // - file2.log in prev-logs (pending processing) injectCmd := fmt.Sprintf( "mkdir -p %s/logs && "+ "echo 'file1 content' > %s/logs/file1.log && "+ - "echo 'file2 content' > %s/logs/file2.log && "+ "mkdir -p %s/logs && "+ - "mv %s/logs/file1.log %s/logs/file1.log", - sessionDir, - sessionDir, - sessionDir, + "echo 'file2 content' > %s/logs/file2.log", persistDir, - sessionDir, persistDir, + sessionDir, + sessionDir, ) _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) g.Expect(stderr.String()).To(BeEmpty()) From 8f4f4e61060070a99cbf3cedf88e444a3f00140b Mon Sep 17 00:00:00 2001 From: yi wang <48236141+my-vegetable-has-exploded@users.noreply.github.com> Date: Sat, 10 Jan 2026 16:58:28 +0800 Subject: [PATCH 14/17] Apply suggestion from @JiangJiaWei1103 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 江家瑋 <36886416+JiangJiaWei1103@users.noreply.github.com> Signed-off-by: yi wang <48236141+my-vegetable-has-exploded@users.noreply.github.com> --- .../logcollector/runtime/logcollector/collector.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index a12a1b2c22d..df3ee60c3de 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -65,8 +65,10 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) - // WatchPrevLogsLoops will scan and process all existing prev-logs on startup, - // then watch for new files. This ensures incomplete uploads from previous runs are resumed. + // WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup + // to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories. + // After scanning, it watches for new directories and files. This ensures incomplete + // uploads from previous runs are resumed. go r.WatchPrevLogsLoops() if r.EnableMeta { go r.WatchSessionLatestLoops() // Watch session_latest symlink changes From 8de439de8fdb8bdcb4b31c46da110e784d54ff19 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Sat, 10 Jan 2026 09:21:48 +0000 Subject: [PATCH 15/17] address comments. Signed-off-by: my-vegetable-has-exploded --- .../runtime/logcollector/collector.go | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index df3ee60c3de..57d0a188023 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -65,10 +65,10 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) - // WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup - // to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories. - // After scanning, it watches for new directories and files. This ensures incomplete - // uploads from previous runs are resumed. + // WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup + // to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories. + // After scanning, it watches for new directories and files. This ensures incomplete + // uploads from previous runs are resumed. go r.WatchPrevLogsLoops() if r.EnableMeta { go r.WatchSessionLatestLoops() // Watch session_latest symlink changes @@ -578,18 +578,30 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) { } } -// isFileAlreadyPersisted checks if a file has already been persisted to persist-complete-logs +// isFileAlreadyPersisted checks if a log file has already been uploaded to storage and moved to +// the persist-complete-logs directory. This prevents duplicate uploads during collector restarts. +// +// When a log file is successfully uploaded, it is moved from prev-logs to persist-complete-logs +// to mark it as processed. This function checks if the equivalent file path exists in the +// persist-complete-logs directory. +// +// Example: +// +// Given absoluteLogPath = "/tmp/ray/prev-logs/session_123/node_456/logs/raylet.out" +// This function checks if "/tmp/ray/persist-complete-logs/session_123/node_456/logs/raylet.out" exists +// - If exists: returns true (file was already uploaded, skip it) +// - If not exists: returns false (file needs to be uploaded) func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool { // Calculate the relative path within the logs directory logsDir := filepath.Join(r.prevLogsDir, sessionID, nodeID, "logs") - relativePath, err := filepath.Rel(logsDir, absoluteLogPath) + relativeLogPath, err := filepath.Rel(logsDir, absoluteLogPath) if err != nil { logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err) return false } // Construct the path in persist-complete-logs - persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativePath) + persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativeLogPath) // Check if the file exists if _, err := os.Stat(persistedPath); err == nil { From a8604d064e4aba907068e1cb6a5bbd86d7309b43 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Sun, 11 Jan 2026 10:45:56 +0000 Subject: [PATCH 16/17] e2e test: add assertions and update description Signed-off-by: my-vegetable-has-exploded --- historyserver/test/e2e/collector_test.go | 172 ++++++++++++----------- 1 file changed, 93 insertions(+), 79 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 9a05cd6a1a7..54b5b5dbf28 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -188,6 +188,99 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 deleteS3Bucket(test, g, s3Client) } +// testCollectorResumesUploadsOnRestart verifies that the Collector scans and resumes uploads from +// the prev-logs directory on startup. +// +// The test case follows these steps: +// 1. Prepare test environment by applying a Ray cluster with the collector and ensuring an empty S3 bucket. +// 2. Kill the collector sidecar container to trigger a container restart. +// 3. Inject leftover logs while the collector is down: +// - file1.log -> /tmp/ray/persist-complete-logs/{sessionID}/{nodeID}/logs/ (already uploaded) +// - file2.log -> /tmp/ray/prev-logs/{sessionID}/{nodeID}/logs/ (pending upload) +// Note: node_events are not injected or verified here; they are handled by the EventServer via a separate path, +// and prev-logs processing only covers the logs directory. +// +// 4. Wait for the collector container to restart and become Ready. +// 5. Verify S3 uploads: recovered log objects exist under log/{clusterName}_{clusterID}/{sessionID}/logs/ and have content. +// 6. Verify local state: the node directory is present under persist-complete-logs and removed from prev-logs. +// 7. Clean up the S3 bucket to ensure test isolation. +func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { + rayCluster := prepareTestEnv(test, g, namespace, s3Client) + + // Directory variables for easier maintenance + prevLogsBaseDir := "/tmp/ray/prev-logs" + persistCompleteBaseDir := "/tmp/ray/persist-complete-logs" + + // Use namespace name to ensure test isolation (avoid conflicts from previous test runs) + dummySessionID := fmt.Sprintf("test-recovery-session-%s", namespace.Name) + dummyNodeID := fmt.Sprintf("head-node-%s", namespace.Name) + clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) + sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID) + + // Kill the collector container to trigger a restart. + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Killing collector container to test startup scanning of prev-logs") + _, stderrKill := ExecPodCmd(test, headPod, "collector", []string{"kill", "1"}) + g.Expect(stderrKill.String()).To(BeEmpty()) + + // Inject "leftover" logs into prev-logs via the ray-head container while collector is down. + LogWithTimestamp(test.T(), "Injecting logs into %s while collector is down", prevLogsBaseDir) + sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) + persistDir := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) + // Inject 2 files to simulate partial processing: + injectCmd := fmt.Sprintf( + "mkdir -p %s/logs && "+ + "echo 'file1 content' > %s/logs/file1.log && "+ + "mkdir -p %s/logs && "+ + "echo 'file2 content' > %s/logs/file2.log", + persistDir, + persistDir, + sessionDir, + sessionDir, + ) + _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) + g.Expect(stderr.String()).To(BeEmpty()) + + // Wait for collector container to restart and become ready. + LogWithTimestamp(test.T(), "Waiting for collector container to restart and become ready") + g.Eventually(func(gg Gomega) { + updatedPod, err := GetHeadPod(test, rayCluster) + gg.Expect(err).NotTo(HaveOccurred()) + cs, err := getContainerStatusByName(updatedPod, "collector") + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(cs.RestartCount).To(BeNumerically(">", 0)) + gg.Expect(cs.Ready).To(BeTrue()) + }, TestTimeoutMedium).Should(Succeed()) + + // Verify S3 uploads using the existing verifyS3SessionDirs helper. + // Skip node_events verification since prev-logs processing only handles logs directory. + LogWithTimestamp(test.T(), "Verifying scanning logic: checking S3 for recovered files") + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, dummyNodeID, true) + + // Verify local state: the node directory should be moved from prev-logs to persist-complete-logs. + LogWithTimestamp(test.T(), "Verifying local state: node directory should be moved to %s", persistCompleteBaseDir) + g.Eventually(func(gg Gomega) { + currentHeadPod, err := GetHeadPod(test, rayCluster) + gg.Expect(err).NotTo(HaveOccurred()) + // Check that the node directory exists in persist-complete-logs + persistPath := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) + checkCmd := fmt.Sprintf("test -d %s && echo 'exists'", persistPath) + stdout, stderrCheck := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkCmd}) + gg.Expect(stderrCheck.String()).To(BeEmpty()) + gg.Expect(strings.TrimSpace(stdout.String())).To(Equal("exists"), "Node directory should be in persist-complete-logs") + + // Check that the node directory is gone from prev-logs + prevPath := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) + checkGoneCmd := fmt.Sprintf("test ! -d %s && echo 'gone'", prevPath) + stdoutGone, stderrGone := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkGoneCmd}) + gg.Expect(stderrGone.String()).To(BeEmpty()) + gg.Expect(strings.TrimSpace(stdoutGone.String())).To(Equal("gone"), "Node directory should be cleaned from prev-logs") + }, TestTimeoutMedium).Should(Succeed()) + + deleteS3Bucket(test, g, s3Client) +} + // ensureS3Client creates an S3 client and ensures API endpoint accessibility. func ensureS3Client(t *testing.T) *s3.S3 { test := With(t) @@ -481,82 +574,3 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co } return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) } - -// testCollectorResumesUploadsOnRestart verifies that the Collector scans and resumes uploads from the prev-logs directory on startup. -func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { - rayCluster := prepareTestEnv(test, g, namespace, s3Client) - - // Directory variables for easier maintenance - prevLogsBaseDir := "/tmp/ray/prev-logs" - persistCompleteBaseDir := "/tmp/ray/persist-complete-logs" - - // Use namespace name to ensure test isolation (avoid conflicts from previous test runs) - dummySessionID := fmt.Sprintf("test-recovery-session-%s", namespace.Name) - dummyNodeID := fmt.Sprintf("head-node-%s", namespace.Name) - clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) - sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID) - - // 1. Kill the collector container to trigger a restart. - headPod, err := GetHeadPod(test, rayCluster) - g.Expect(err).NotTo(HaveOccurred()) - LogWithTimestamp(test.T(), "Killing collector container to test startup scanning of prev-logs") - _, _ = ExecPodCmd(test, headPod, "collector", []string{"kill", "1"}) - - // 2. Inject "leftover" logs into prev-logs via the ray-head container while collector is down. - // Note: We only inject logs, not node_events, because the collector's prev-logs processing - // currently only handles the logs directory. node_events are handled by the EventServer separately. - LogWithTimestamp(test.T(), "Injecting logs into %s while collector is down", prevLogsBaseDir) - sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) - persistDir := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) - // Inject 2 files to simulate partial processing: - // - file1.log in persist-complete-logs (already processed and uploaded) - // - file2.log in prev-logs (pending processing) - injectCmd := fmt.Sprintf( - "mkdir -p %s/logs && "+ - "echo 'file1 content' > %s/logs/file1.log && "+ - "mkdir -p %s/logs && "+ - "echo 'file2 content' > %s/logs/file2.log", - persistDir, - persistDir, - sessionDir, - sessionDir, - ) - _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) - g.Expect(stderr.String()).To(BeEmpty()) - - // 3. Wait for collector container to restart and become ready. - LogWithTimestamp(test.T(), "Waiting for collector container to restart and become ready") - g.Eventually(func(gg Gomega) { - updatedPod, err := GetHeadPod(test, rayCluster) - gg.Expect(err).NotTo(HaveOccurred()) - cs, err := getContainerStatusByName(updatedPod, "collector") - gg.Expect(err).NotTo(HaveOccurred()) - gg.Expect(cs.RestartCount).To(BeNumerically(">", 0)) - gg.Expect(cs.Ready).To(BeTrue()) - }, TestTimeoutMedium).Should(Succeed()) - - // 4. Verify S3 uploads using the existing verifyS3SessionDirs helper. - // Skip node_events verification since prev-logs processing only handles logs directory. - LogWithTimestamp(test.T(), "Verifying scanning logic: checking S3 for recovered files") - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, dummyNodeID, true) - - // 5. Verify local state: the node directory should be moved from prev-logs to persist-complete-logs. - LogWithTimestamp(test.T(), "Verifying local state: node directory should be moved to %s", persistCompleteBaseDir) - g.Eventually(func(gg Gomega) { - currentHeadPod, err := GetHeadPod(test, rayCluster) - gg.Expect(err).NotTo(HaveOccurred()) - // Check that the node directory exists in persist-complete-logs - persistPath := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) - checkCmd := fmt.Sprintf("test -d %s && echo 'exists'", persistPath) - stdout, _ := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkCmd}) - gg.Expect(strings.TrimSpace(stdout.String())).To(Equal("exists"), "Node directory should be in persist-complete-logs") - - // Check that the node directory is gone from prev-logs - prevPath := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) - checkGoneCmd := fmt.Sprintf("test ! -d %s && echo 'gone'", prevPath) - stdoutGone, _ := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkGoneCmd}) - gg.Expect(strings.TrimSpace(stdoutGone.String())).To(Equal("gone"), "Node directory should be cleaned from prev-logs") - }, TestTimeoutMedium).Should(Succeed()) - - deleteS3Bucket(test, g, s3Client) -} From 38486d0718d576aa0dc0a3990d3136b2b89ceda3 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 14 Jan 2026 16:09:07 +0800 Subject: [PATCH 17/17] Better test Signed-off-by: Future-Outlier --- .../runtime/logcollector/collector_test.go | 61 ++++++++++++------- historyserver/test/e2e/collector_test.go | 57 +++++++++++++---- 2 files changed, 86 insertions(+), 32 deletions(-) diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go index 5f61995fb39..c3f76fadf41 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go @@ -7,6 +7,8 @@ import ( "sync" "testing" "time" + + . "github.com/onsi/gomega" ) // MockStorageWriter is a mock implementation of storage.StorageWriter for testing @@ -114,8 +116,18 @@ func TestIsFileAlreadyPersisted(t *testing.T) { } } -// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan +// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan. +// +// This test simulates a crash recovery scenario: +// 1. Two log files exist in prev-logs +// 2. Only file1 is processed (simulating partial success before crash) +// 3. File1 is restored to prev-logs (simulating incomplete rename during crash) +// 4. WatchPrevLogsLoops is started (simulating collector restart) +// 5. Verify that file1 is NOT re-uploaded (idempotency) and file2 is uploaded +// 6. Verify that the node directory is cleaned up after all files are processed func TestScanAndProcess(t *testing.T) { + g := NewWithT(t) + baseDir, cleanup := setupRayTestEnvironment(t) defer cleanup() @@ -134,45 +146,52 @@ func TestScanAndProcess(t *testing.T) { nodeID := "node-1" logsDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs") - // Prepare two files + // Prepare two log files in prev-logs directory f1 := filepath.Join(logsDir, "file1.log") f2 := filepath.Join(logsDir, "file2.log") createTestLogFile(t, f1, "content1") createTestLogFile(t, f2, "content2") - // --- Step 1: Process file1 only (simulating partial success) --- + // --- Step 1: Process file1 only (simulating partial success before crash) --- err := handler.processPrevLogFile(f1, logsDir, sessionID, nodeID) if err != nil { t.Fatalf("Failed to process file1: %v", err) } - // Verify file1 is in storage + // Verify file1 is uploaded to storage if len(mockWriter.writtenFiles) != 1 { t.Errorf("Expected 1 file in storage, got %d", len(mockWriter.writtenFiles)) } - // Manually restore file1 to prev-logs to simulate a crash right after upload but before/during rename + // Manually restore file1 to prev-logs to simulate a crash right after upload + // but before the rename operation completed createTestLogFile(t, f1, "content1") - // --- Step 2: Run startup scan in background --- + // --- Step 2: Start the startup scan in background (simulating collector restart) --- go handler.WatchPrevLogsLoops() - // Wait for async processing - give more time for file2.log to be processed - time.Sleep(1000 * time.Millisecond) - - // --- Step 3: Final Verification --- - // 1. Storage should have 2 unique files (file1 should NOT be re-uploaded) - if len(mockWriter.writtenFiles) != 2 { - t.Errorf("Expected 2 unique files in storage, got %d", len(mockWriter.writtenFiles)) - } - - // 2. The node directory in prev-logs should be removed now that all files are processed + // --- Step 3: Use Eventually to wait for async processing --- sessionNodeDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID) - if _, err := os.Stat(sessionNodeDir); !os.IsNotExist(err) { - t.Error("Node directory should be removed after all files are processed and moved") - } - // Ensure background goroutine exits + // Wait until storage has exactly 2 files. + // file1 should NOT be re-uploaded because it already exists in persist-complete-logs. + // Only file2 should be newly uploaded. + g.Eventually(func() int { + mockWriter.mu.Lock() + defer mockWriter.mu.Unlock() + return len(mockWriter.writtenFiles) + }, 5*time.Second, 100*time.Millisecond).Should(Equal(2), + "Storage should have 2 unique files (file1 should NOT be re-uploaded due to idempotency check)") + + // Wait until the node directory in prev-logs is removed. + // After all files are processed and moved to persist-complete-logs, + // the node directory should be cleaned up. + g.Eventually(func() bool { + _, err := os.Stat(sessionNodeDir) + return os.IsNotExist(err) + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue(), + "Node directory should be removed after all files are processed and moved to persist-complete-logs") + + // Signal the background goroutine to exit gracefully close(handler.ShutdownChan) - time.Sleep(100 * time.Millisecond) } diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 54b5b5dbf28..0139fbd04bf 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -193,13 +193,13 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 // // The test case follows these steps: // 1. Prepare test environment by applying a Ray cluster with the collector and ensuring an empty S3 bucket. -// 2. Kill the collector sidecar container to trigger a container restart. -// 3. Inject leftover logs while the collector is down: +// 2. Inject leftover logs before killing the collector: // - file1.log -> /tmp/ray/persist-complete-logs/{sessionID}/{nodeID}/logs/ (already uploaded) // - file2.log -> /tmp/ray/prev-logs/{sessionID}/{nodeID}/logs/ (pending upload) // Note: node_events are not injected or verified here; they are handled by the EventServer via a separate path, // and prev-logs processing only covers the logs directory. // +// 3. Kill the collector sidecar container to trigger a container restart. // 4. Wait for the collector container to restart and become Ready. // 5. Verify S3 uploads: recovered log objects exist under log/{clusterName}_{clusterID}/{sessionID}/logs/ and have content. // 6. Verify local state: the node directory is present under persist-complete-logs and removed from prev-logs. @@ -217,18 +217,13 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID) - // Kill the collector container to trigger a restart. + // Inject "leftover" logs BEFORE killing collector. + // This ensures files exist when collector restarts and performs its initial scan. headPod, err := GetHeadPod(test, rayCluster) g.Expect(err).NotTo(HaveOccurred()) - LogWithTimestamp(test.T(), "Killing collector container to test startup scanning of prev-logs") - _, stderrKill := ExecPodCmd(test, headPod, "collector", []string{"kill", "1"}) - g.Expect(stderrKill.String()).To(BeEmpty()) - - // Inject "leftover" logs into prev-logs via the ray-head container while collector is down. - LogWithTimestamp(test.T(), "Injecting logs into %s while collector is down", prevLogsBaseDir) + LogWithTimestamp(test.T(), "Injecting logs into %s before killing collector", prevLogsBaseDir) sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) persistDir := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) - // Inject 2 files to simulate partial processing: injectCmd := fmt.Sprintf( "mkdir -p %s/logs && "+ "echo 'file1 content' > %s/logs/file1.log && "+ @@ -242,6 +237,12 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) g.Expect(stderr.String()).To(BeEmpty()) + // Kill the collector container to trigger a restart. + // When collector restarts, WatchPrevLogsLoops() will scan prev-logs and find the injected files. + LogWithTimestamp(test.T(), "Killing collector container to test startup scanning of prev-logs") + _, stderrKill := ExecPodCmd(test, headPod, "collector", []string{"kill", "1"}) + g.Expect(stderrKill.String()).To(BeEmpty()) + // Wait for collector container to restart and become ready. LogWithTimestamp(test.T(), "Waiting for collector container to restart and become ready") g.Eventually(func(gg Gomega) { @@ -256,7 +257,41 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 // Verify S3 uploads using the existing verifyS3SessionDirs helper. // Skip node_events verification since prev-logs processing only handles logs directory. LogWithTimestamp(test.T(), "Verifying scanning logic: checking S3 for recovered files") - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, dummyNodeID, true) + + // Verify that file2.log was actually uploaded to S3. + // file1.log should NOT be uploaded because it was already marked as "completed" in persist-complete-logs. + // file2.log should be uploaded because it was in prev-logs (pending upload). + LogWithTimestamp(test.T(), "Verifying file2.log was uploaded to S3 (idempotency check)") + g.Eventually(func(gg Gomega) { + // List all objects under the session logs prefix + logsPrefix := sessionPrefix + "logs/" + objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(s3BucketName), + Prefix: aws.String(logsPrefix), + }) + gg.Expect(err).NotTo(HaveOccurred()) + + // Collect all uploaded file keys + var uploadedKeys []string + for _, obj := range objects.Contents { + uploadedKeys = append(uploadedKeys, aws.StringValue(obj.Key)) + } + LogWithTimestamp(test.T(), "Found uploaded objects: %v", uploadedKeys) + + // Verify file2.log exists in S3 (it was in prev-logs, so it should be uploaded) + hasFile2 := false + for _, key := range uploadedKeys { + if strings.HasSuffix(key, "file2.log") { + hasFile2 = true + break + } + } + gg.Expect(hasFile2).To(BeTrue(), "file2.log should be uploaded to S3 because it was in prev-logs") + + // Note: file1.log was only placed in persist-complete-logs (local marker), + // it was never actually uploaded to S3 in this test scenario. + // The persist-complete-logs directory is just a local marker to prevent re-upload. + }, TestTimeoutMedium).Should(Succeed()) // Verify local state: the node directory should be moved from prev-logs to persist-complete-logs. LogWithTimestamp(test.T(), "Verifying local state: node directory should be moved to %s", persistCompleteBaseDir)