Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions karton/core/asyncio/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ async def internal_process(self, task: Task) -> None:
self.current_task = task

if not task.matches_filters(self.filters):
self.log.info("Task rejected because binds are no longer valid.")
self.log.info(
"Task rejected because binds are no longer valid. "
"Rejected ask headers: %s",
task.headers,
)
await self.backend.set_task_status(task, TaskState.FINISHED)
# Task rejected: end of processing
return
Expand Down Expand Up @@ -301,7 +305,13 @@ async def _loop(self) -> None:
if not old_bind:
self.log.info("Service binds created.")
elif old_bind != self._bind:
self.log.info("Binds changed, old service instances should exit soon.")
self.log.info(
"Binds changed, old service instances should exit soon. "
"Old binds: %s "
"New binds: %s",
old_bind,
self._bind,
)

for task_filter in self.filters:
self.log.info("Binding on: %s", task_filter)
Expand All @@ -312,7 +322,13 @@ async def _loop(self) -> None:
while True:
current_bind = await self.backend.get_bind(self.identity)
if current_bind != self._bind:
self.log.info("Binds changed, shutting down.")
self.log.info(
"Binds changed, shutting down. "
"Old binds: %s "
"New binds: %s",
self._bind,
current_bind,
)
break
if self.concurrency_semaphore is not None:
await self.concurrency_semaphore.acquire()
Expand Down
25 changes: 21 additions & 4 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ def internal_process(self, task: Task) -> None:
self.current_task = task

if not task.matches_filters(self.filters):
self.log.info("Task rejected because binds are no longer valid.")
self.log.info(
"Task rejected because binds are no longer valid. "
"Rejected ask headers: %s",
task.headers,
)
self.backend.set_task_status(task, TaskState.FINISHED)
# Task rejected: end of processing
return
Expand Down Expand Up @@ -339,15 +343,28 @@ def loop(self) -> None:
if not old_bind:
self.log.info("Service binds created.")
elif old_bind != self._bind:
self.log.info("Binds changed, old service instances should exit soon.")
self.log.info(
"Binds changed, old service instances should exit soon. "
"Old binds: %s "
"New binds: %s",
old_bind,
self._bind,
)

for task_filter in self.filters:
self.log.info("Binding on: %s", task_filter)

with self.graceful_killer():
while not self.shutdown:
if self.backend.get_bind(self.identity) != self._bind:
self.log.info("Binds changed, shutting down.")
current_bind = self.backend.get_bind(self.identity)
if current_bind != self._bind:
self.log.info(
"Binds changed, shutting down. "
"Old binds: %s "
"New binds: %s",
self._bind,
current_bind,
)
break
task = self.backend.consume_routed_task(self.identity)
if task:
Expand Down