Skip to content

Allow to throw InterruptedException in consumer workers when shutting down #2920

@jonenst

Description

@jonenst

Expected Behavior

When the server receives SIGINT or SIGTERM, I would like the user code that is consuming the messages to be able to react. A simple way would be to set the worker threads interrupt flag, so that user code can check it (or rely on functions like Thread.sleep() to check it). This is useful for consummers that take a long time to consume each message (e.g. low bandwidth queue with big computations for each message)

Current Behavior
If I understood correctly, the only possible options now are to wait for all prefeteched messages to be consummed normally, or to wait for only the current messages to be consummed normally. There are no options to interrupt the threads consumming the message. After a short time, the predestroy/shutdown hook stops waiting for consummer and returns, and so the jvm halts all running threads without any notice.

Context

I would like to be able to store in my database that a message consumption was interrupted and what my code was doing when it was interrupted (a stacktrace captures this information nicely)

I could install my own predestroy/shutdownhook to interrupt the threads, but you need deep understanding of very low level jvm apis to do it correctly, you need to get a reference to the thread. It's doable at the begining of every message consumption and store the threads in a list of things to interrupt but a bit hard to do.

Also the fact that it doesn't exist suggests that it's not really a good idea, maybe documentation could clarify the perception around this idea.

I'm using spring-cloud-stream and creating a java.util.function.Consumer bean to consume the message. With the default conf it uses container type SMLC:

spring.cloud.function.definition=consumeRun
spring.cloud.stream.bindings.consumeRun-in-0.destination=myQueue
spring.cloud.stream.bindings.consumeRun-in-0.group= myGroup
spring.cloud.stream.bindings.consumeRun-in-0.consumer.concurrency=2
@Bean public Consumer<Message<String>> consumeRun() { return mySlowComputationIntensiveConsumerBean; }

Thanks in advance

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions