Skip to content
33 changes: 32 additions & 1 deletion airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def run_duration() -> float:
raise AirflowRescheduleException(reschedule_date)
else:
time.sleep(self._get_next_poke_interval(started_at, run_duration, poke_count))
poke_count += 1
poke_count += 1
Comment thread
sanket2000 marked this conversation as resolved.
Outdated
self.log.info("Success criteria met. Exiting.")
return xcom_value

Expand All @@ -344,6 +344,37 @@ def _get_next_poke_interval(
if not self.exponential_backoff:
return self.poke_interval

if self.reschedule:
# Calculate elapsed time since the sensor started
elapsed_time = run_duration()

# Initialize variables for the simulation
cumulative_time = 0
estimated_poke_count = 0

while cumulative_time < elapsed_time:
estimated_poke_count += 1
# Calculate min_backoff for the current try number
min_backoff = max(int(self.poke_interval * (2 ** (estimated_poke_count - 1))), 1)

# Calculate the jitter
Comment thread
sanket2000 marked this conversation as resolved.
run_hash = int(
hashlib.sha1(
f"{self.dag_id}#{self.task_id}#{started_at}#{estimated_poke_count}".encode()
).hexdigest(),
16,
)
modded_hash = min_backoff + run_hash % min_backoff

# Calculate the interval with jitter
interval_with_jitter = min(modded_hash, timedelta.max.total_seconds() - 1)

# Add the interval to the cumulative time
cumulative_time += interval_with_jitter

# Now we have an estimated_poke_count based on the elapsed time
poke_count = estimated_poke_count

# The value of min_backoff should always be greater than or equal to 1.
min_backoff = max(int(self.poke_interval * (2 ** (poke_count - 2))), 1)

Expand Down