@@ -35,9 +35,18 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
3535 context .optimize_fuse_stages = True
3636
3737 if context .new_execution_backend :
38- logger = DatasetLogger ("ray.data._internal.execution.bulk_executor" ).get_logger (
39- log_to_stdout = enable_auto_log_stats ,
40- )
38+ if context .use_streaming_executor :
39+ logger = DatasetLogger (
40+ "ray.data._internal.execution.streaming_executor"
41+ ).get_logger (
42+ log_to_stdout = enable_auto_log_stats ,
43+ )
44+ else :
45+ logger = DatasetLogger (
46+ "ray.data._internal.execution.bulk_executor"
47+ ).get_logger (
48+ log_to_stdout = enable_auto_log_stats ,
49+ )
4150 else :
4251 logger = DatasetLogger ("ray.data._internal.plan" ).get_logger (
4352 log_to_stdout = enable_auto_log_stats ,
@@ -111,9 +120,24 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
111120 stats = canonicalize (ds .fully_executed ().stats ())
112121
113122 if context .new_execution_backend :
114- assert (
115- stats
116- == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T
123+ if context .use_streaming_executor :
124+ assert (
125+ stats
126+ == """Stage N read->MapBatches(dummy_map_batches)->map: N/N blocks executed in T
127+ * Remote wall time: T min, T max, T mean, T total
128+ * Remote cpu time: T min, T max, T mean, T total
129+ * Peak heap memory usage (MiB): N min, N max, N mean
130+ * Output num rows: N min, N max, N mean, N total
131+ * Output size bytes: N min, N max, N mean, N total
132+ * Tasks per node: N min, N max, N mean; N nodes used
133+ * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \
134+ 'obj_store_mem_peak': N}
135+ """
136+ )
137+ else :
138+ assert (
139+ stats
140+ == """Stage N read->MapBatches(dummy_map_batches): N/N blocks executed in T
117141* Remote wall time: T min, T max, T mean, T total
118142* Remote cpu time: T min, T max, T mean, T total
119143* Peak heap memory usage (MiB): N min, N max, N mean
@@ -141,7 +165,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
141165* In user code: T
142166* Total time: T
143167"""
144- )
168+ )
145169 else :
146170 assert (
147171 stats
@@ -364,9 +388,18 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_
364388 context .optimize_fuse_stages = True
365389
366390 if context .new_execution_backend :
367- logger = DatasetLogger ("ray.data._internal.execution.bulk_executor" ).get_logger (
368- log_to_stdout = enable_auto_log_stats ,
369- )
391+ if context .use_streaming_executor :
392+ logger = DatasetLogger (
393+ "ray.data._internal.execution.streaming_executor"
394+ ).get_logger (
395+ log_to_stdout = enable_auto_log_stats ,
396+ )
397+ else :
398+ logger = DatasetLogger (
399+ "ray.data._internal.execution.bulk_executor"
400+ ).get_logger (
401+ log_to_stdout = enable_auto_log_stats ,
402+ )
370403 else :
371404 logger = DatasetLogger ("ray.data._internal.plan" ).get_logger (
372405 log_to_stdout = enable_auto_log_stats ,
0 commit comments