Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more comments #73

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/event-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,50 @@ func (s Service) Run(ctx context.Context) error {
runners := 0

runners++
// Serve http
go func() {
errCh <- errors.Wrap(s.server.ListenAndServe(ctx), "server error")
}()

// Fetch events from the database relays and send them to the in memory pubsub
runners++
go func() {
errCh <- errors.Wrap(s.downloader.Run(ctx), "downloader error")
}()

// Subscribe to the in memory pubsub of events, emit a NewSaveReceivedEvent
// command that will make some checks on the event, save it if the check
// passes and emit a EventSavedEvent to the sqlite pubsub.
runners++
go func() {
errCh <- errors.Wrap(s.receivedEventSubscriber.Run(ctx), "received event subscriber error")
}()

// Subscribe to saved events in the database. This uses the sqlite pubsub. This triggers:
// - analysis to extract new relays and store them in db. They will be used by the downloader.
// - analysis to store pubkeys and store them in the db (contacts_followees, pubkeys, contacts_events). This will be used by the downloader.
// - publish to watermill pubsub
// - may publish the event in wss://relay.nos.social if they are metadata related
runners++
go func() {
errCh <- errors.Wrap(s.eventSavedEventSubscriber.Run(ctx), "event saved subscriber error")
}()

// The metrics timer collects metrics from the app.
runners++
go func() {
errCh <- errors.Wrap(s.metricsTimer.Run(ctx), "metrics timer error")
}()

// Sqlite transaction runner
runners++
go func() {
errCh <- errors.Wrap(s.transactionRunner.Run(ctx), "transaction runner error")
}()

// The task scheduler creates sequential time window based tasks that
// contain filters to be applied to each relay to fetch the events we want.
// Event downloaders subscribe to this.
runners++
go func() {
errCh <- errors.Wrap(s.taskScheduler.Run(ctx), "task scheduler error")
Expand Down
2 changes: 2 additions & 0 deletions service/app/handler_process_saved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE
return errors.Wrap(err, "error saving relays and contacts")
}

// Published to nostr-events google pubsub
if err := h.externalEventPublisher.PublishNewEventReceived(ctx, event); err != nil {
return errors.Wrap(err, "error publishing the external event")
}
Expand Down Expand Up @@ -200,6 +201,7 @@ func ShouldSendEventToRelay(event Event) bool {
return false
}

// Check if the event is too old.
if !pushToRelayFilter.IsOk(event) {
return false
}
Expand Down
9 changes: 9 additions & 0 deletions service/domain/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func NewDownloader(
}
}

// Will fetch pubkeys from the database and a list of hardcoded kinds from each relay found in out database.
// These will be used to create tasks that specify nostr filters and contain a time window to control the since and until filter keys.
// Theses filters are used to start queries for each relay. Events found this
// way will be published to all subscribers of the downloader publisher.
func (d *Downloader) Run(ctx context.Context) error {
go d.storeMetricsLoop(ctx)

Expand Down Expand Up @@ -167,6 +171,7 @@ func (d *Downloader) storeMetrics() {
d.metrics.ReportNumberOfRelayDownloaders(len(d.relayDownloaders))
}

// For each relay from getRelays() start a downloader and kill those that are not part of the list.
func (d *Downloader) updateDownloaders(ctx context.Context) error {
relays, err := d.getRelays(ctx)
if err != nil {
Expand Down Expand Up @@ -216,6 +221,7 @@ func (d *Downloader) updateDownloaders(ctx context.Context) error {
return nil
}

// Get the bootstrap relays and those already in the database.
func (d *Downloader) getRelays(ctx context.Context) (*internal.Set[domain.RelayAddress], error) {
result := internal.NewEmptySet[domain.RelayAddress]()

Expand Down Expand Up @@ -268,6 +274,7 @@ func NewRelayDownloader(
return v
}

// Will fetch tasks for the current relay and use them to query it and then publish the event to a pubsub.
func (d *RelayDownloader) Start(ctx context.Context) error {
ch, err := d.scheduler.GetTasks(ctx, d.address)
if err != nil {
Expand All @@ -276,6 +283,7 @@ func (d *RelayDownloader) Start(ctx context.Context) error {

go func() {
for task := range ch {
// Uses the filter of the task to fetch events from the relay for the task time window.
go d.performTask(task)
}
}()
Expand All @@ -290,6 +298,7 @@ func (d *RelayDownloader) performTask(task Task) {
}
}

// Run the filter specified by the task and for each event found publish it to all subscribers.
func (d *RelayDownloader) performTaskWithErr(task Task) error {
ch, err := d.relayConnections.GetEvents(task.Ctx(), d.address, task.Filter())
if err != nil {
Expand Down
25 changes: 23 additions & 2 deletions service/domain/downloader/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Scheduler interface {
GetTasks(ctx context.Context, relay domain.RelayAddress) (<-chan Task, error)
}

// A TaskScheduler is responsible for generating tasks for all relays by maintaing a list of TaskGenerators.
type TaskScheduler struct {
taskGeneratorsLock sync.Mutex
taskGenerators map[domain.RelayAddress]*RelayTaskGenerator
Expand Down Expand Up @@ -65,6 +66,7 @@ func (t *TaskScheduler) GetTasks(ctx context.Context, relay domain.RelayAddress)
}

ch := make(chan Task)
// The subscription for the relay will generate a sequence of tasks mapped to time windows
generator.AddSubscription(ctx, ch)
return ch, nil
}
Expand Down Expand Up @@ -155,6 +157,16 @@ func newTaskSubscription(ctx context.Context, ch chan Task) *taskSubscription {
}
}

// A RelayTaskGenerator is responsible for generating tasks for a single relay.
// Each task provides the time window for each query (since and until) and keeps
// track of how many available running queries we can perform on the relay, we only
// allow 3 running queries at a time; for global, author and tag queries across all
// subscription channels.

// RelayTaskGenerator maintains 3 TimeWindowTaskGenerator, one for each query
// type. Each TimeWindowTaskGenerator maintains a list of TimeWindowTaskTracker,
// one for each time window. Each TimeWindowTaskTracker maintains a list of
// runningRelayDownloader, one for each concurrency setting. Each TimeWindowTaskTracker uses a TimeWindow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. We have very similar code to this in the iOS app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be extracted into a library at some point

type RelayTaskGenerator struct {
lock sync.Mutex

Expand Down Expand Up @@ -213,6 +225,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo
t.lock.Lock()
defer t.lock.Unlock()

// Create tags for the public keys.
var pTags []domain.FilterTag
for _, publicKey := range publicKeys.Tagged() {
tag, err := domain.NewFilterTag(domain.TagProfile, publicKey.Hex())
Expand All @@ -222,6 +235,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo
pTags = append(pTags, tag)
}

// Delete each done subscription.
slices.DeleteFunc(t.taskSubscriptions, func(subscription *taskSubscription) bool {
select {
case <-subscription.ctx.Done():
Expand All @@ -233,6 +247,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo

sentTasksForAtLeastOneSubscription := false
for _, taskSubscription := range t.taskSubscriptions {
// Send a task for each subscription
numberOfSentTasks, err := t.pushTasks(taskSubscription.ctx, taskSubscription.ch, publicKeys.Authors(), pTags)
if err != nil {
return false, errors.Wrap(err, "error sending out generators")
Expand All @@ -245,6 +260,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo
return sentTasksForAtLeastOneSubscription, nil
}

// Pushes tasks to the task channel. If tasks are not done nothing is pushed.
func (t *RelayTaskGenerator) pushTasks(ctx context.Context, ch chan<- Task, authors []domain.PublicKey, tags []domain.FilterTag) (int, error) {
tasks, err := t.getTasksToPush(ctx, authors, tags)
if err != nil {
Expand Down Expand Up @@ -317,12 +333,17 @@ func NewTimeWindowTaskGenerator(
}, nil
}

// A task generator creates a task tracker per concurrency setting. The tracker
// will be used to return the corresponding task, if the task is still runnning
// it will return no task. If the task is done it will discard the current
// tracker, create a new one and return a new task.
// Each task generated will be pushed to all subscribers of the scheduler
func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) {
t.lock.Lock()
defer t.lock.Unlock()

t.taskTrackers = slices.DeleteFunc(t.taskTrackers, func(task *TimeWindowTaskTracker) bool {
return task.CheckIfDoneAndEnd()
t.taskTrackers = slices.DeleteFunc(t.taskTrackers, func(tracker *TimeWindowTaskTracker) bool {
return tracker.CheckIfDoneAndEnd()
})

for i := len(t.taskTrackers); i < timeWindowTaskConcurrency; i++ {
Expand Down
12 changes: 9 additions & 3 deletions service/domain/downloader/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestTaskScheduler_SchedulerWaitsForTasksToCompleteBeforeProducingMore(t *te
ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress())
require.NoError(t, err)

tasks := collectAllTasks(ctx, ch, false)
completeTasks := false
tasks := collectAllTasks(ctx, ch, completeTasks)

require.EventuallyWithT(t, func(t *assert.CollectT) {
require.Equal(t, numberOfTaskTypes, len(tasks.Tasks()))
Expand All @@ -67,9 +68,12 @@ func TestTaskScheduler_SchedulerDoesNotProduceEmptyTasks(t *testing.T) {
ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress())
require.NoError(t, err)

tasks := collectAllTasks(ctx, ch, false)
completeTasks := false
tasks := collectAllTasks(ctx, ch, completeTasks)

<-time.After(5 * time.Second)

// No public keys so no task for authors or ptags, but still one for event kinds
require.Len(t, tasks.Tasks(), 1)
}

Expand All @@ -91,14 +95,16 @@ func TestTaskScheduler_SchedulerProducesTasksFromSequentialTimeWindowsLeadingUpT
ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress())
require.NoError(t, err)

tasks := collectAllTasks(ctx, ch, true)
completeTasks := true
tasks := collectAllTasks(ctx, ch, completeTasks)

require.EventuallyWithT(t, func(t *assert.CollectT) {
filters := make(map[downloader.TimeWindow][]domain.Filter)
for _, task := range tasks.Tasks() {
start := task.Filter().Since()
duration := task.Filter().Until().Sub(*start)
window := downloader.MustNewTimeWindow(*start, duration)
// there will be 3 tasks per window, one for kind filter, one for authors filter and one for tags filter
filters[window] = append(filters[window], task.Filter())
}

Expand Down
5 changes: 4 additions & 1 deletion service/domain/event_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ var (
EventKindEncryptedDirectMessage = MustNewEventKind(4)
EventKindReaction = MustNewEventKind(7)
EventKindRelayListMetadata = MustNewEventKind(10002)
EventKindRegistration = MustNewEventKind(6666)
// TODO: This should be changed to 30078
// See https://github.com/nostr-protocol/nips/blob/master/78.md , 6666 is
// reserved by nip 90
EventKindRegistration = MustNewEventKind(6666)
)

type EventKind struct {
Expand Down