Skip to content

Implement event stream subscriptions; port social example.#3784

Merged
afck merged 21 commits intolinera-io:mainfrom
afck:update-streams
Apr 23, 2025
Merged

Implement event stream subscriptions; port social example.#3784
afck merged 21 commits intolinera-io:mainfrom
afck:update-streams

Conversation

@afck
Copy link
Contributor

@afck afck commented Apr 16, 2025

Motivation

We want to replace pub-sub channels with event streams for better scalability (#365).

Events can already be emitted and read; this PR adds subscriptions.

Proposal

Handle the UpdateStreams system operation. Chain owners can inform applications on a chain about new events in streams published by other chains this way. Also add back subscribe_to_events and unsubscribe_from_events.

Keep a note in the TransactionTracker about any changes to event subscriptions or new events in relevant streams. As part of the same transaction, call process_streams in all subscriber contracts.

When synchronizing a chain, and in the chain listener, detect new events and commit UpdateStreams as appropriate. (This is part of process_inbox now.)

Port the social example to use event streams instead of pub-sub channels, without changing any of the business logic (which should be fixed, too, but that is a separate issue: #3782).

Test Plan

CI; the two pub-sub channel tests have been adapted.

Release Plan

  • Nothing to do / These changes follow the usual release cycle.

Links

@afck afck marked this pull request as ready for review April 16, 2025 16:11
@afck afck requested review from bart-linera, deuszx and ma2bd April 16, 2025 16:11
@afck afck mentioned this pull request Apr 17, 2025
@afck
Copy link
Contributor Author

afck commented Apr 18, 2025

I thought this decision - which events to read - will be done externally as well.

No, the current design (see LIP-20) just updates the contracts about the highest index; the rest is up to the contract.

if subscriptions.next_index >= next_index {
continue;
}
subscriptions.next_index = next_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should update the index after we act on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this line to after we put the information into the transaction tracker.

Or do you mean we should update it only after calling process_streams in all the applications? That would complicate the code quite a bit, because we use the same code to call process_streams, regardless of whether we just processed UpdateStreams or any other operation or message that may have added a new subscription.

Copy link
Contributor

@deuszx deuszx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass.

let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
return Ok(());
};
for subscriber_id in subscribers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscriber_id is a chain so I think it actually makes more sense to call maybe_process_inbox(chain_id) than this - we already established the domain language that chains have inboxes - subscribers don't - so let's stick to that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean you'd rename this to subscriber_chain_id? And subscribers to subscriber_chains?

But then event_subscribers should also be event_subscriber_chains?

If I understand correctly you're saying the words "publisher" and "subscriber" should apply to (ChainId, ApplicationId), and therefore we should use "publishing chain" and "subscribing chain" for ChainId? I can try to make those changes across the board, just want to make sure I'm not misunderstanding.

Comment on lines +537 to +543
self.context()
.extra()
.get_event(EventId {
chain_id,
stream_id: stream_id.clone(),
index,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to make sure that we have that event locally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Event values are supposed to be small anyway, but I can replace this with something like contains_event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether this call here is needed at all? The assumption is that we have the events, it's not like we need to sync it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the client maybe, but the server needs to verify that the events the block proposer claims exist actually exist.

@afck afck force-pushed the update-streams branch 2 times, most recently from ddcd0da to 8df8f4a Compare April 22, 2025 11:07
Comment on lines +1325 to +1326
/// The lowest index of a new event.
pub previous_index: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add somewhere that the new events have indices in the range previous_index..next_index (using Rust conventions)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I just saw the function below. Maybe "see new_indices below"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5484b8c.

@afck
Copy link
Contributor Author

afck commented Apr 23, 2025

I mentioned yesterday that I might modify the social example to remove existing received_posts entries when we unsubscribe. However, received_posts is a map view, so if we re-subscribe, there is no danger of duplicating posts, so it's less important.

(I could do it anyway, if we think these posts should disappear on unsubscribing, but I'm not sure. It's also kind of unrelated to this PR now.)

let publishers = self
.update_event_subscriptions(notification.chain_id)
.await?;
if !publishers.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if shouldn't be necessary - if publishers is empty then listen_recursively will return early.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. But maybe we should still avoid needlessly calling process_inbox.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be allowed to call process_inbox w/o having the apriori knowledge about whether it's necessary or not - i.e. internally the method should quickly (and easily) verify if the call will be idempotent or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But process_inbox can't know whether we just started listening to any new publishing chains, so it would always have to look through them again, and load the relevant data from storage.

Copy link
Contributor

@deuszx deuszx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another batch.

Comment on lines +772 to +774
if self.chain_id != self.admin_id {
publishers.insert(self.admin_id);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this special-casing? I think in general a chain should not subscribe to itself so we should always add the admin_id and always make sure that we don't subscribe to our chain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how we could get rid of that: If our chain is the admin chain, we don't need to synchronize the admin chain again here. If it isn't, we do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to say that we would filter out our own chain "deeper" in the callstack but in the other comment you wrote that sometimes we do want.

let publishers = self
.update_event_subscriptions(notification.chain_id)
.await?;
if !publishers.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be allowed to call process_inbox w/o having the apriori knowledge about whether it's necessary or not - i.e. internally the method should quickly (and easily) verify if the call will be idempotent or not.

.iter()
.map(|(chain_id, _)| *chain_id)
.chain(iter::once(self.admin_id))
.filter(|chain_id| *chain_id != self.chain_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this check into synchronize_chain_state and return early. That way we don't need to remember about filtering it out everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in other call sites we probably do want to synchronize our own chain.


/// Synchronizes all chains that any application on this chain subscribes to.
/// We always consider the admin chain a relevant publishing chain, for new epochs.
async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need _publisher_ in the method name? We synchronize the chain state which is more general.

The point I'm trying to make is whether we need to special case the events/publishers here or maybe we want to synchronize "all chains that we track"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here we specifically synchronize those chains that our own chain is subscribed to. We don't want to synchronize all tracked chains here.

.or_insert_with(|| (previous_index, next_index));
}

pub fn remove_stream_to_process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here? This method deletes information but it's not explained when it should be used (or expected to be).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove_event_stream ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as below: We're not really removing an event stream, just the note that we need to update a particular app about a particular stream. So I'm not sure the changed name would be better.

self.operation_result = result
}

pub fn add_stream_to_process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just add_new_event_stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this isn't necessarily a new stream: It can be called either because of a new subscription to a stream, or because of new events in an existing subscription's stream.

@afck afck merged commit f9f7fc8 into linera-io:main Apr 23, 2025
26 checks passed
@afck afck deleted the update-streams branch April 23, 2025 15:45
afck added a commit that referenced this pull request Apr 23, 2025
## Motivation

With #3784, pub-sub channels are no longer needed.

## Proposal

Remove them, and the types `ChannelName`, `Origin`, `Target`, `Medium`,
`Destination`, etc.

## Test Plan

CI

## Release Plan

- Nothing to do / These changes follow the usual release cycle.

## Links

- Part of #365.
- [reviewer
checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants