Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -827,27 +827,23 @@ def estimate_total_num_of_blocks(

if (
upstream_op_num_outputs > 0
and metrics.num_inputs_received > 0
and metrics.num_tasks_finished > 0
and metrics.average_num_inputs_per_task
and metrics.average_num_outputs_per_task
and metrics.average_rows_outputs_per_task
):
estimated_num_tasks = total_num_tasks
if estimated_num_tasks is None:
estimated_num_tasks = (
upstream_op_num_outputs
/ metrics.num_inputs_received
* num_tasks_submitted
upstream_op_num_outputs / metrics.average_num_inputs_per_task
)

estimated_num_output_bundles = round(
estimated_num_tasks
* metrics.num_outputs_of_finished_tasks
/ metrics.num_tasks_finished
estimated_num_tasks * metrics.average_num_outputs_per_task
)
estimated_output_num_rows = round(
estimated_num_tasks
* metrics.rows_task_outputs_generated
/ metrics.num_tasks_finished
estimated_num_tasks * metrics.average_rows_outputs_per_task
Comment on lines +837 to +844
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these metrics are None, this code will raise a TypeError:

  • average_num_inputs_per_task
  • average_num_outputs_per_task
  • average_rows_outputs_per_task

This can't happen right now because we check num_tasks_finished > 0, and the three metrics aren't None when num_tasks_finished > 0. But, that's an implementation detail, and isn't guaranteed by their interface.

This code would be more robust if replaced this check:

        and metrics.num_inputs_received > 0
        and metrics.num_tasks_finished > 0

With explicit checks that the metrics aren't None.

)

return (
estimated_num_tasks,
estimated_num_output_bundles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(
data_context: DataContext,
input_data: Optional[List[RefBundle]] = None,
input_data_factory: Optional[Callable[[int], List[RefBundle]]] = None,
num_output_blocks: Optional[int] = None,
):
"""Create an InputDataBuffer.

Expand All @@ -30,8 +29,6 @@ def __init__(
object to use injestion.
input_data: The list of bundles to output from this operator.
input_data_factory: The factory to get input data, if input_data is None.
num_output_blocks: The number of output blocks. If not specified, progress
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not being used anywhere

bars total will be set based on num output bundles instead.
"""
super().__init__("Input", [], data_context)
if input_data is not None:
Expand Down