Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Add stream_without_transaction #4526

Closed
wants to merge 1 commit into from

Conversation

Zarathustra2
Copy link
Contributor

Hi,

This is a draft proposal to add a new function called stream_without_transaction.

Currently Repo.stream must be run inside an transaction for Postgres. This can be an issue for multiple reasons:

Say we have a huge users table and we want to sync the users subscription status once per day, we would probably write it as following:

Repo.transaction(fn -> 
  Repo.stream(User)
  |> Stream.map(fn user -> 
    # Slow api call
    sub_status = MyApp.Stripe.sub_status(user)
    if sub_status != user.sub_status do
      # Update user row
    end
  end)
end, timeout: :infinity)

The issue is that the transaction runs very long. And long running transactions are not good: All the dead tuples that became dead after the transaction started cannot be vacuumed until the transaction completes. This is an issue when you have tables that are being updated frequently and need to be vacuumed often. Long transactions will contribute to table bloat as such.

The new function stream_without_transaction solves that, we can use that based on two approaches:

  1. use offset/limit
    For smaller tables we can just use offset because the costs of using offset is negligible for small tables:
q = from(u in WebsitePage, select: u.visits)
total_visits = MyApp.stream_without_transaction(q, limit: 500)
|> Enum.reduce(0, fn visits, acc -> acc + visits end)
  1. For bigger tables offset become more expensive. So we can provide a custom next function which returns a new query which will get us the next "page"
next = fn q, latest_fetched_rows ->
      latest = latest_fetched_rows |> Enum.map(& &1.inserted_at) |> Enum.max(DateTime)
      ids = latest_fetched_rows |> Enum.map(& &1.id)
      where(q, [e], e.inserted_at >= ^latest and e.id not in ^ids)
end

q = from(e in WebsitePage, order_by: e.inserted_at)

MyApp.stream_without_transaction(q, next: next)
|> Enum.reduce(0, fn e, acc -> acc + e.visits end)

I have not bothered with adding tests/docs as I wanted to first see whether the maintainers agree with adding such a function.

Let me know what you think! :)

@greg-rychlewski
Copy link
Member

greg-rychlewski commented Oct 11, 2024

Hi,

Please correct me if I'm misunderstanding. But I believe this is more like a pagination helper than a streaming functionality. If I'm not misremembering, the repo stream is meant to wrap the cursor functionality of the database. So it is quite different.

If I'm not misinterpreting your suggestion, there are some nice libraries to help with pagination like flop: https://github.com/woylie/flop.

@Zarathustra2
Copy link
Contributor Author

Hi Greg,

You are right, this is more of a paginated stream. Though I see Ecto.stream as a function which allows me to stream the database rows and if I do not need the guarantees of a transaction I would like to opt out.

Flop does not provide this functionality as far as I can tell quickly reading through their hex docs. I might need to research more.

The function is fairly small but currently the only way to stream a whole table is to use either Repo.stream in a transaction (if you need the guarantees of a transaction) or write a custom new stream function.

Maybe if the function should not be added to Ecto itself we should update the docs so users are aware of the implications of long running transactions.

I personally believe this is a nice QOL addition to Ecto. Could also update Ecto.stream so that we keep that as the only stream function but allow opting out of transactions. Something like: Ecto.stream(run_without_transaction: true)

@greg-rychlewski
Copy link
Member

I believe there is a bit of a conflation happening between streaming and retrieving things in smaller chunks.

The idea behind stream is you take a single query and get the results in chunks. This is supported in SQL databases like Postgres and MySQL using cursors. The transaction restriction is actually not imposed by Ecto but by those databases. They require a transaction because a single statement can't be split amongst multiple transactions.

The streaming in your proposal is more of a choice you made to implement batching. It is actually fetching the results in a batch manner using more than one query. Instead of using Stream.resource you could have also defined a recursive function.

@Zarathustra2
Copy link
Contributor Author

Good point. Yeah it is more batching my function should be named batching instead of streaming maybe.

Should I go ahead and close this here now then or would you want me to add some gotchas/pitfalls to Ecto.Stream about long running transactions?

@greg-rychlewski
Copy link
Member

I think it's good advice but very specific to Postgres in regards to the vacuuming. I'm not sure if we usually put that advice in Ecto SQL or here. I will take a closer look.

@greg-rychlewski
Copy link
Member

Maybe you already know about this, but one of the more insidious things I ran into with long transactions was this: https://www.cybertec-postgresql.com/en/streaming-replication-conflicts-in-postgresql/.

Basically you have a replica that is using a row in a transaction and the main server sends you an update to delete that row. If your query isn't done after a certain amount of time it's cancelled.

@warmwaffles
Copy link
Member

warmwaffles commented Oct 12, 2024

For streaming outside of a transaction, I resorted to simply using a cursor in postgres to do this. So far I have not had issues. But for other databases cough sqlite cough we don't have cursors.

@josevalim
Copy link
Member

Agreed with everything @greg-rychlewski said. This is not the same as the streaming functionality and it emits different results. Given this can be implemented fully on top of Ecto Repos, it does not require new functionality at the adapter level, I suggest implementing it as a library. :) Thank you.

@josevalim josevalim closed this Oct 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants