Skip to content

Commit 2e5abf6

Browse files
committed
chore: add logs to ensure task A actually completes
1 parent 1d87012 commit 2e5abf6

File tree

1 file changed

+15
-3
lines changed

1 file changed

+15
-3
lines changed

aws_advanced_python_wrapper/writer_failover_handler.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,19 @@ def get_result_from_future(self, current_topology: Tuple[HostInfo, ...]) -> Writ
124124
futures = [executor.submit(self.reconnect_to_writer, writer_host),
125125
executor.submit(self.wait_for_new_writer, current_topology, writer_host)]
126126
for future in as_completed(futures, timeout=self._max_failover_timeout_sec):
127+
logger.info("future completed")
127128
result = future.result()
128129
if result.is_connected:
129130
self.log_task_success(result)
130131
return result
131132
if result.exception is not None:
132133
return result
133134
except TimeoutError:
135+
logger.info("futures timed out")
136+
logger.info("setting timeout_event 1")
134137
self._timeout_event.set()
135138
finally:
139+
logger.info("setting timeout_event 2")
136140
self._timeout_event.set()
137141
finally:
138142
executor.shutdown(wait=False)
@@ -181,12 +185,15 @@ def reconnect_to_writer(self, initial_writer_host: HostInfo):
181185
if not self._plugin_service.is_network_exception(ex):
182186
logger.debug("WriterFailoverHandler.TaskAEncounteredException", ex)
183187
return WriterFailoverResult(False, False, None, None, "TaskA", ex)
188+
else:
189+
logger.info("[WriterFailoverHandler] [TaskA] encountered a network exception: " + str(ex))
184190

185191
if latest_topology is None or len(latest_topology) == 0:
192+
logger.debug("sleeping because latest_topology is " + ("none" if latest_topology is None else str(len(latest_topology))))
186193
sleep(self._reconnect_writer_interval_sec)
187-
else:
188-
success = self.is_current_host_writer(latest_topology, initial_writer_host)
189194

195+
success = self.is_current_host_writer(latest_topology, initial_writer_host)
196+
logger.debug("[TaskA] success: " + str(success))
190197
self._plugin_service.set_availability(initial_writer_host.as_aliases(), HostAvailability.AVAILABLE)
191198
return WriterFailoverResult(success, False, latest_topology, conn if success else None, "TaskA", None)
192199

@@ -197,10 +204,14 @@ def reconnect_to_writer(self, initial_writer_host: HostInfo):
197204
finally:
198205
try:
199206
if conn is not None and not success:
207+
logger.info("Task A ending but didn't succeed, closing connection")
200208
conn.close()
201-
except Exception:
209+
except Exception as e:
210+
logger.error("Encountered error while closing connection: " + str(e))
202211
pass
203212
logger.debug("WriterFailoverHandler.TaskAFinished")
213+
logger.info("returning from reconnect_to_writer")
214+
204215

205216
def is_current_host_writer(self, latest_topology: Tuple[HostInfo, ...], initial_writer_host: HostInfo) -> bool:
206217
latest_writer: Optional[HostInfo] = self.get_writer(latest_topology)
@@ -239,6 +250,7 @@ def wait_for_new_writer(self, current_topology: Tuple[HostInfo, ...], current_ho
239250
finally:
240251
self.cleanup()
241252
logger.debug("WriterFailoverHandler.TaskBFinished")
253+
logger.info("returning from wait_for_new_writer")
242254

243255
def connect_to_reader(self) -> None:
244256
while not self._timeout_event.is_set():

0 commit comments

Comments
 (0)