Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with Fabric Event Streams #170

Open
ePaul opened this issue Jun 8, 2023 · 5 comments
Open

Integrate with Fabric Event Streams #170

ePaul opened this issue Jun 8, 2023 · 5 comments
Labels
auto-configuration everything about the auto-configuration features enhancement nakadi-submission persistence everything around DB access

Comments

@ePaul
Copy link
Member

ePaul commented Jun 8, 2023

Background
Fabric Event Streams (FES) is a (Zalando-internal) solution (internal link) for change data capture, which (in its original and most common configuration) reads a postgresql database's write-ahead log (e.g. of an outbox table) and sends the DB changes out as Nakadi events, giving an alternative implementation of the outbox pattern implemented by this library.
FES is configured via a custom K8s resource (with some extensibility allowed via AWS lambdas).

Idea
The idea here is to allow using this library's EventLogWriter interface to create the eventlog entries, but then use FES to send them out (skipping the whole scheduler + Nakadi interface).

Details:

  • still have one outbox table.
  • insert + immediately delete as part of the fire* methods.
  • configure FES to listen to this outbox table and take all events from there, submit to the event type indicated by the row.
  • no configuration of Nakadi interface needed
  • no scheduler for event transmitter needed
  • There should be a single (boolean) configuration property for the spring-boot-starter which triggers this behavior, possibly some additional ones if there is something to tweak.

Benefits

  • The Outbox table can be write+delete only, no reads needed. As we delete immediately, the table won't grow.
  • FES is optimized to allow high throughput – the same can't be said for this library.
  • Other transformations are possible in FES.
@ePaul ePaul added enhancement nakadi-submission auto-configuration everything about the auto-configuration features labels Jun 8, 2023
@ePaul
Copy link
Member Author

ePaul commented Jun 8, 2023

To investigate:

  • How much configuration is needed from FES side for this? How can we keep it minimal?
  • Can we keep our existing DB schema, or should we have one which is FES-optimized?
  • What would be a migration path for an application using the existing submission schema?

@ePaul
Copy link
Member Author

ePaul commented Jun 8, 2023

A discussion in the (Zalando-internal) FES chat room showed two options:

a. no change in FES:

  • change the outbox structure (or maybe better get a new outbox table) to just id + payload
  • have the EventLogWriter add all the needed metadata (including event type) to the payload before writing it into the DB
  • do a delete after the insert
  • disable the transmission scheduler
  • use the standard PostgresLogicalReplication source with this outbox table
  • use the standard PostgresWalToGenericNakadiEvent flow
  • use the standard Nakadi sink (without specified event type).

b. minimal changes to Nakadi-producer:

  • keep the outbox structure as-is (possibly remove locking fields)
  • do a delete after the insert
  • disable the transmission scheduler
  • use the standard PostgresLogicalReplication source with this outbox table
  • build a new flow into FES which works similar to EventTransmissionService.mapToNakadiEvent (+ put the event type into the metadata)
  • use the standard Nakadi sink (without specified event type).

In both cases we'll need to think careful about the roll-out sequence, especially when changing from an existing setup.

@ePaul
Copy link
Member Author

ePaul commented Jun 19, 2023

Trying to implement this now (variant (b)).

I mostly got the FES part done, and disabling the sending out is also easy (we already got a property for that).

The surprisingly difficult part is in the "do a delete after the insert":
Our repository.delete() methods are using the id, but the persist methods don't return the id. This is caused by the way it's implemented – JDBC's update and batchUpdate can only return a number of affected rows.

I guess we could try to use query (and INSERT INTO ... RETURNING id or similar), but that doesn't support batching (except for DB2's JDBC driver, it seems).

Maybe we can do the INSERT and DELETE immediately in the same statement?

WITH ids AS (
   INSERT INTO nakadi_producer_eventlog (....)
   VALUES (...)
   RETURNING id
)
DELETE FROM nakadi_producer_eventlog as el
 USING ids
  WHERE ids.id =  el.id

... if that works (i.e. if this actually inserts and immediately deletes a row, leaving two WAL entry but nothing in the table), this could certainly be batched.

@ePaul
Copy link
Member Author

ePaul commented Jun 20, 2023

The idea mentioned above doesn't work – the DELETE doesn't see the effects of the INSERT, so it doesn't delete anything:

The sub-statements in WITH are executed concurrently with each other and with the main query. Therefore, when using data-modifying statements in WITH, the order in which the specified updates actually happen is unpredictable. All the statements are executed with the same snapshot (see Chapter 13), so they cannot “see” one another's effects on the target tables. This alleviates the effects of the unpredictability of the actual order of row updates, and means that RETURNING data is the only way to communicate changes between different WITH sub-statements and the main query.

(From 7.8.4. Data-Modifying Statements in WITH, emphasis mine.)

Here is a list of other options to look at:

  • Skip the batching for INSERTs (but then I get more DB round-trips) to return the ID, then do a batch DELETE (or DELETE with IN ...).
  • Build a manual batching as described in https://javaranch.com/journal/200510/batching.html – i.e. a many-row INSERT returning many IDs, then using these for the DELETE.
  • Assign the IDs in Java before INSERTing, so I know what to DELETE (but then I need to make care to avoid duplicates when multiple instances are inserting)
  • Keep the job and have it just do the DELETEs, no actual Nakadi interaction.
  • (by @remychantenay) Set up postgresql triggers to delete automatically (might create deadlocks on high concurrency)
  • Use a stored procedure or a DO block (with PL/PGSQL) to do the INSERT + DELETE logic completely on PG side.
  • (by @remychantenay) Directly send WAL messages without even writing to the table (pg_logical_emit_message)
    • That likely needs some adaptions on FES side to read these messages properly, or could mess up replication to actual replicas.
  • (by @hughcapet) at the end of the transaction, delete all rows in the eventlog table with xmin = txid_current().
    • this likely needs some exploration to see how we can hook into the end-of-transaction processing.

This was referenced Jul 28, 2023
@ePaul ePaul added the persistence everything around DB access label Jul 31, 2023
@ePaul
Copy link
Member Author

ePaul commented Jul 31, 2023

In #173 + #174 I'm exploring how the "manual batching" approach might work. At least in a test it seems to properly delete the items immediately. I'll need to plug this together to see how it works end-to-end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-configuration everything about the auto-configuration features enhancement nakadi-submission persistence everything around DB access
Projects
None yet
Development

No branches or pull requests

1 participant