Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebuild RabbitMQ node and trigger #3244

Closed
wants to merge 21 commits into from
Closed

Conversation

michael-radency
Copy link
Contributor

continuation of #2921 by Ken-Michalak

Ken-Michalak and others added 3 commits March 2, 2022 10:36
* Use amqp-connection-manager to automatically reconnect
* Support options on messages
* Add ability to bind a queue to an exchange
* Use prefetch on queues to prevent overloading n8n
@michael-radency michael-radency added node/improvement New feature or request community Authored by a community member labels May 5, 2022
Copy link

@Ken-Michalak Ken-Michalak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the nack thing it seems fine. The descriptions for options were just copied from RabbitMQ docs before.

Did something with linting change recently? I remember linting it and running prettier before.

// TODO should we nack it? It probably won't succeed on a reattempt, but it wasn't successful either
// FIXME how do we report a failure to the ux?
if(options?.nack) {
channel.nack(message, false, false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this should be used. Normally you'd nack to put it back in the queue so it's reattempted, but would there be a case where that would be helpful? If it errors out on the message parsing, it's not going to succeed on a second attempt right?

If we could nack the message if the workflow failed somewhere down the line, that'd probably be ideal, but I couldn't see a way to do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do actually not putting it back in queue but rejecting, second false. There is this PR related to what you are suggesting, maybe you can take a look at

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice. I didn't catch that. In that case, I don't see a reason that you wouldn't want to nack it, so you probably don't need the option. It should just always send that nack if it can't parse a message.

Does that deferred promise on the emit normally get the response of the last node in the workflow, and reject it if the workflow fails anywhere? I can take a look at implementing something like that for the emit later, but it's probably not needed to merge this in.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this should be used. Normally you'd nack to put it back in the queue so it's reattempted, but would there be a case where that would be helpful? If it errors out on the message parsing, it's not going to succeed on a second attempt right?

If we could nack the message if the workflow failed somewhere down the line, that'd probably be ideal, but I couldn't see a way to do that.

Just to be sure I understand: the aim would be to be able to wait until the end of the workflow, like for the webhook trigger

image

and based on the workflow's result (or some data returned by the last node in the workflow), decide to ack or nack the message, right?

if I understood it right, yes @Ken-Michalak that would be awesome if you could take a look and super useful for our use case

@michael-radency
Copy link
Contributor Author

Besides the nack thing it seems fine. The descriptions for options were just copied from RabbitMQ docs before.

Did something with linting change recently? I remember linting it and running prettier before.

you can run npm run nodelinter -- --target=nodes/RabbitMQ from packages/nodes-base/

@mirobertod
Copy link

Any update on this? We also need the feature to bind a queue to an exchange.
Thanks

@nivbe06 nivbe06 added the n8n team Authored by the n8n team label Jun 1, 2022
@michael-radency
Copy link
Contributor Author

Hello @Ken-Michalak I made update to this PR due to RabbitMQ node changes

@Ken-Michalak
Copy link

@michael-radency I wasn't following what happened with the MessageTracker changes, but it looks like the majority of my changes and fixes were lost with that last merge. It might be easier to start with the code at 1647a6d and add that MessageTracker to it if that's even necessary. If that was a fix for something, you should probably test if that was happening on the rebuild first. There wasn't much left of how the original node worked in what I wrote. Only about 10 lines of the GenericFunctions.ts remained the same.

As an aside, we've been using the code from my original PR in production (with a different name), and we haven't been having any issues. The original nodes were not usable because of the reconnect and prefetch issues.

@michael-radency
Copy link
Contributor Author

@Ken-Michalak I was trying to keep changes to current nodes to a minimum, avoid breaking changes and use only amqplib. if it wouldn't be possible I will add amqp-connection-manager back.

@Ken-Michalak
Copy link

@michael-radency Yeah, amqplib doesn't do anything on errors but fire events. You'd have to listen for the right error events, and then recreate the connection, consumer, queues, etc, which is what the connection manager does. There's also supposed to be only 1 connection with many consumers, but that's not how the old node was working.

Comment on lines 370 to 371
const connections = getAllConnections();
connections.forEach(connection => connection.close());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to close every RabbitMQ connection when one workflow (actually one node) is stopped. The connections are 1 per set of credentials and running process, and should only be closed when the entire n8n process shuts down. (You could attempt to track the number of consumers per connection and shut it down, but I thought that would be difficult to track accurately and could lead to bugs.)

https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels

When you have multiple workflows with rabbitmq nodes and multiple workers, you don't want to open and close a connection for every node, which is why there's a global map of connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Ken-Michalak , I will make it as an option

@mirobertod
Copy link

@Ken-Michalak @michael-radency Hi guys, any update on this PR?

@Ken-Michalak
Copy link

@mirobertod Sorry, I'm not a maintainer, so it's out of my hands at this point. I'm not really sure what's going on either.

@michael-radency
Copy link
Contributor Author

Hello @mirobertod , @Ken-Michalak this PR is currently on review process, right now I can not give any ETA, but I will let you know of any updates as soon as possible

@mirobertod
Copy link

Hi guys, any update on this PR?

@agobrech
Copy link
Contributor

@mirobertod The issue of this PR has been fixed by those PR #4019 & #3385. I will close this PR if there is no additional feature wanted.

@agobrech agobrech closed this Jan 27, 2023
@Ken-Michalak
Copy link

@agobrech
The reconnect and the parallel executions look good. It seems like there should be a default prefetch, but as long as we configure one it's fine. (While testing it, starting a workflow with just the trigger, the default options, and 100 messages in a queue completely overloaded n8n using up 10+ GiB of ram and 8 cores.)

There are still a few other things from that pr that are missing though. I added the ability to subscribe to an exchange or topic. What's in there now was pretty confusing at first because you can send messages into an exchange, and the trigger says, "queue / exchange" but it only actually works with a queue. The only way to get messages from a topic is to manually create the queue and binding outside of n8n, or by using an HTTP request node attached to a trigger for workflow activated.

The pr also adds the ability to set separate options on messages besides headers, like priority, or content type, which can use expressions so they're different per item. (The trigger looked at the content type header and if it was json, it would parse it without needing the option set.) That might be a part of #2806 but with RabbitMQ a TTL can be set as an argument on the queue, and/or on specific messages.

The current node does open and close the connection each time it publishes messages, which goes against RabbitMQ's best practices. I think I fixed in that pr, but I'm not sure how much of a performance improvement that really has.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community Authored by a community member n8n team Authored by the n8n team node/improvement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants