Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ def estimate_total_num_of_blocks(
if (
upstream_op_num_outputs > 0
and metrics.num_inputs_received > 0
and metrics.num_tasks_have_outputs > 0
and metrics.num_tasks_finished > 0
):
estimated_num_tasks = total_num_tasks
Expand All @@ -762,16 +763,29 @@ def estimate_total_num_of_blocks(
* num_tasks_submitted
)

estimated_num_output_bundles = round(
estimated_num_tasks
* metrics.num_outputs_of_finished_tasks
/ metrics.num_tasks_finished
)
estimated_output_num_rows = round(
estimated_num_tasks
* metrics.rows_task_outputs_generated
/ metrics.num_tasks_finished
)
# tasks that are yielding multiple blocks
if (
metrics.average_rows_outputs_per_task is None
or metrics.num_task_outputs_generated is None
):
estimated_num_output_bundles = (
estimated_num_tasks * metrics.num_task_outputs_generated
)
estimated_output_num_rows = (
estimated_num_tasks * metrics.rows_task_outputs_generated
)
else:
estimated_num_output_bundles = round(
estimated_num_tasks
* metrics.num_outputs_of_finished_tasks
/ metrics.num_tasks_finished
)
estimated_output_num_rows = round(
estimated_num_tasks
* metrics.rows_outputs_of_finished_tasks
/ metrics.num_tasks_finished
)

return (
estimated_num_tasks,
estimated_num_output_bundles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(
data_context: DataContext,
input_data: Optional[List[RefBundle]] = None,
input_data_factory: Optional[Callable[[int], List[RefBundle]]] = None,
num_output_blocks: Optional[int] = None,
):
"""Create an InputDataBuffer.

Expand All @@ -30,8 +29,6 @@ def __init__(
object to use injestion.
input_data: The list of bundles to output from this operator.
input_data_factory: The factory to get input data, if input_data is None.
num_output_blocks: The number of output blocks. If not specified, progress
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not being used anywhere

bars total will be set based on num output bundles instead.
"""
super().__init__("Input", [], data_context)
if input_data is not None:
Expand Down
14 changes: 7 additions & 7 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,6 @@ def _output_ready_callback(
assert len(output) == 1
self._metrics.on_task_output_generated(task_index, output)

# Notify output queue that the task has produced an new output.
self._output_queue.notify_task_output_ready(task_index, output)
self._metrics.on_output_queued(output)

def _task_done_callback(task_index: int, exception: Optional[Exception]):
self._metrics.on_task_finished(task_index, exception)

# Estimate number of tasks and rows from inputs received and tasks
# submitted so far
(
Expand All @@ -433,6 +426,13 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]):
self._next_data_task_idx, self.upstream_op_num_outputs(), self._metrics
)

# Notify output queue that the task has produced an new output.
self._output_queue.notify_task_output_ready(task_index, output)
self._metrics.on_output_queued(output)

def _task_done_callback(task_index: int, exception: Optional[Exception]):
self._metrics.on_task_finished(task_index, exception)

self._data_tasks.pop(task_index)
# Notify output queue that this task is complete.
self._output_queue.notify_task_completed(task_index)
Expand Down