You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The consumer commit(manual commits) the events' offsets each X events, the events are saved in db and the matching events' offsets in a second db's table. If the consumer instance crashes, at restart it looks up the last saved offset(so event) and commits the un-committed offsets until(and include) that offset, then the consumer should resume to process the next unprocessed events.
The problem is, at-restart, even if the offsets(until and include the one recorded in db) are committed, the last commited one(per partition) is re-displayed, which is undesirable as it duplicates this events received at the consumer.
To Fix it I tried to Seek() or Assign() to the offset next to the last committed one, it does increase the consumer's offset but does not affect the cluster offset reference. So it's finally even worse because I end up with something like "topic[0]6" from the cluster and "topic[0]7" on the consumer, and anyway the last commited one(per partition) is still re-displayed.
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
config := &kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:29092",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
}
// 1- consumer restart (after crashing)
// 2- commit first(the events saved in db)
Committed offsets: [data-database[2]@14 data-database[2]@15]
// 3- last commited event is re-display anyway, create a duplicate of the event
Message on data-database[2]@15:
Annie
// 4- then receive next events
Message on data-database[2]@16:
Bob
// In the case I did seek() after step 2, then the next commit resp would show
Committed offsets: [data-database[2]@17] // which is data-database[2]@16: Bob
Any idea what I could add/change to realize an "exactly once" producing event ?? Thanks
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi,
The consumer commit(manual commits) the events' offsets each X events, the events are saved in db and the matching events' offsets in a second db's table. If the consumer instance crashes, at restart it looks up the last saved offset(so event) and commits the un-committed offsets until(and include) that offset, then the consumer should resume to process the next unprocessed events.
The problem is, at-restart, even if the offsets(until and include the one recorded in db) are committed, the last commited one(per partition) is re-displayed, which is undesirable as it duplicates this events received at the consumer.
To Fix it I tried to Seek() or Assign() to the offset next to the last committed one, it does increase the consumer's offset but does not affect the cluster offset reference. So it's finally even worse because I end up with something like "topic[0]6" from the cluster and "topic[0]7" on the consumer, and anyway the last commited one(per partition) is still re-displayed.
Any idea what I could add/change to realize an "exactly once" producing event ?? Thanks
Beta Was this translation helpful? Give feedback.
All reactions