Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rsslay to proactively retrieve feeds #21

Merged
merged 20 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 64 additions & 147 deletions cmd/rsslay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
_ "embed"
"errors"
"flag"
"fmt"
"log"
Expand All @@ -25,13 +24,18 @@ import (
"github.com/nbd-wtf/go-nostr/nip11"
"github.com/piraces/rsslay/internal/handlers"
"github.com/piraces/rsslay/pkg/custom_cache"
"github.com/piraces/rsslay/pkg/events"
"github.com/piraces/rsslay/pkg/feed"
"github.com/piraces/rsslay/pkg/metrics"
"github.com/piraces/rsslay/pkg/new/adapters"
pubsubadapters "github.com/piraces/rsslay/pkg/new/adapters/pubsub"
"github.com/piraces/rsslay/pkg/new/app"
"github.com/piraces/rsslay/pkg/new/domain"
"github.com/piraces/rsslay/pkg/new/ports"
pubsub2 "github.com/piraces/rsslay/pkg/new/ports/pubsub"
"github.com/piraces/rsslay/pkg/replayer"
"github.com/piraces/rsslay/scripts"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/exp/slices"
)

// Command line flags.
Expand Down Expand Up @@ -63,13 +67,14 @@ type Relay struct {
RedisConnectionString string `envconfig:"REDIS_CONNECTION_STRING" default:""`

updates chan nostr.Event
lastEmitted sync.Map
db *sql.DB
healthCheck *health.Health
mutex sync.Mutex
routineQueueLength int
converterSelector *feed.ConverterSelector
cache *cache.Cache[string]
handler *handlers.Handler
store *store
}

var relayInstance = &Relay{
Expand Down Expand Up @@ -114,20 +119,20 @@ func (r *Relay) Name() string {

func (r *Relay) OnInitialized(s *relayer.Server) {
s.Router().Path("/").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleWebpage(writer, request, r.db, &r.MainDomainName)
r.handler.HandleWebpage(writer, request, &r.MainDomainName)
})
s.Router().Path("/create").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleCreateFeed(writer, request, r.db, &r.Secret, dsn)
r.handler.HandleCreateFeed(writer, request, dsn)
})
s.Router().Path("/search").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleSearch(writer, request, r.db)
r.handler.HandleSearch(writer, request)
})
s.Router().
PathPrefix(assetsDir).
Handler(http.StripPrefix(assetsDir, http.FileServer(http.Dir("./web/"+assetsDir))))
s.Router().Path("/healthz").HandlerFunc(relayInstance.healthCheck.HandlerFunc)
s.Router().Path("/api/feed").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleApiFeed(writer, request, r.db, &r.Secret, dsn)
r.handler.HandleApiFeed(writer, request, dsn)
})
s.Router().Path("/.well-known/nostr.json").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleNip05(writer, request, r.db, &r.OwnerPublicKey, &r.EnableAutoNIP05Registration)
Expand All @@ -136,6 +141,8 @@ func (r *Relay) OnInitialized(s *relayer.Server) {
}

func (r *Relay) Init() error {
ctx := context.TODO()

flag.Parse()
err := envconfig.Process("", r)
if err != nil {
Expand All @@ -145,54 +152,58 @@ func (r *Relay) Init() error {
}

ConfigureCache()
r.db = InitDatabase(r)

go r.UpdateListeningFilters()

longFormConverter := feed.NewLongFormConverter()
r.converterSelector = feed.NewConverterSelector(longFormConverter)
db := InitDatabase(r)
feedDefinitionStorage := adapters.NewFeedDefinitionStorage(db)
eventStorage := adapters.NewEventStorage()
receivedEventPubSub := pubsubadapters.NewReceivedEventPubSub()

return nil
}
secret, err := domain.NewSecret(r.Secret)
if err != nil {
return errors.Wrap(err, "error creating a secret")
}

func (r *Relay) UpdateListeningFilters() {
for {
time.Sleep(20 * time.Minute)
metrics.ListeningFiltersOps.Inc()

filters := relayer.GetListeningFilters()
log.Printf("[DEBUG] Checking for updates; %d filters active", len(filters))

var parsedEvents []replayer.EventWithPrivateKey
for _, filter := range filters {
if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindTextNote) {
for _, pubkey := range filter.Authors {
parsedFeed, entity := events.GetParsedFeedForPubKey(pubkey, r.db, r.DeleteFailingFeeds, r.NitterInstances)
if parsedFeed == nil {
continue
}

converter := r.converterSelector.Select(parsedFeed)

for _, item := range parsedFeed.Items {
defaultCreatedAt := time.Unix(time.Now().Unix(), 0)
evt := converter.Convert(pubkey, item, parsedFeed, defaultCreatedAt, entity.URL)
last, ok := r.lastEmitted.Load(entity.URL)
if last == nil {
last = uint32(time.Now().Unix())
}
if !ok || nostr.Timestamp(int64(last.(uint32))) < evt.CreatedAt {
_ = evt.Sign(entity.PrivateKey)
r.updates <- evt
r.lastEmitted.Store(entity.URL, last.(uint32))
parsedEvents = append(parsedEvents, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey})
}
}
}
}
}
r.AttemptReplayEvents(parsedEvents)
r.converterSelector = feed.NewConverterSelector(feed.NewLongFormConverter())

handlerCreateFeedDefinition := app.NewHandlerCreateFeedDefinition(secret, feedDefinitionStorage)
handlerUpdateFeeds := app.NewHandlerUpdateFeeds(
r.DeleteFailingFeeds,
r.NitterInstances,
r.EnableAutoNIP05Registration,
r.DefaultProfilePictureUrl,
r.MainDomainName,
db,
feedDefinitionStorage,
r.converterSelector,
eventStorage,
receivedEventPubSub,
)
handlerGetEvents := app.NewHandlerGetEvents(eventStorage)
handlerOnNewEventCreated := app.NewHandlerOnNewEventCreated(r.updates)
handlerGetTotalFeedCount := app.NewHandlerGetTotalFeedCount(feedDefinitionStorage)
handlerGetRandomFeeds := app.NewHandlerGetRandomFeeds(feedDefinitionStorage)
handlerSearchFeeds := app.NewHandlerSearchFeeds(feedDefinitionStorage)

updateFeedsTimer := ports.NewUpdateFeedsTimer(handlerUpdateFeeds)
receivedEventSubscriber := pubsub2.NewReceivedEventSubscriber(receivedEventPubSub, handlerOnNewEventCreated)

app := app.App{
CreateFeedDefinition: handlerCreateFeedDefinition,
UpdateFeeds: handlerUpdateFeeds,
GetEvents: handlerGetEvents,
GetTotalFeedCount: handlerGetTotalFeedCount,
GetRandomFeeds: handlerGetRandomFeeds,
SearchFeeds: handlerSearchFeeds,
}

r.db = db
r.handler = handlers.NewHandler(app)
r.store = newStore(app)

go updateFeedsTimer.Run(ctx)
go receivedEventSubscriber.Run(ctx)

return nil
}

func (r *Relay) AttemptReplayEvents(events []replayer.EventWithPrivateKey) {
Expand All @@ -217,101 +228,7 @@ func (r *Relay) AcceptEvent(_ *nostr.Event) bool {
}

func (r *Relay) Storage() relayer.Storage {
return store{r.db}
}

type store struct {
db *sql.DB
}

func (b store) Init() error { return nil }
func (b store) SaveEvent(_ *nostr.Event) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we don't accept any events")
}

func (b store) DeleteEvent(_, _ string) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we can't delete any events")
}

func (b store) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
var parsedEvents []nostr.Event
var eventsToReplay []replayer.EventWithPrivateKey

metrics.QueryEventsRequests.Inc()

if filter.IDs != nil || len(filter.Tags) > 0 {
return parsedEvents, nil
}

for _, pubkey := range filter.Authors {
parsedFeed, entity := events.GetParsedFeedForPubKey(pubkey, relayInstance.db, relayInstance.DeleteFailingFeeds, relayInstance.NitterInstances)

if parsedFeed == nil {
continue
}

converter := relayInstance.converterSelector.Select(parsedFeed)

if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindSetMetadata) {
evt := feed.EntryFeedToSetMetadata(pubkey, parsedFeed, entity.URL, relayInstance.EnableAutoNIP05Registration, relayInstance.DefaultProfilePictureUrl, relayInstance.MainDomainName)

if filter.Since != nil && evt.CreatedAt < *filter.Since {
continue
}
if filter.Until != nil && evt.CreatedAt > *filter.Until {
continue
}

_ = evt.Sign(entity.PrivateKey)
parsedEvents = append(parsedEvents, evt)
if relayInstance.ReplayToRelays {
eventsToReplay = append(eventsToReplay, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey})
}
}

if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindTextNote) || slices.Contains(filter.Kinds, feed.KindLongFormTextContent) {
var last uint32 = 0
for _, item := range parsedFeed.Items {
defaultCreatedAt := time.Unix(time.Now().Unix(), 0)
evt := converter.Convert(pubkey, item, parsedFeed, defaultCreatedAt, entity.URL)

// Feed need to have a date for each entry...
if evt.CreatedAt == nostr.Timestamp(defaultCreatedAt.Unix()) {
continue
}

if filter.Since != nil && evt.CreatedAt < *filter.Since {
continue
}
if filter.Until != nil && evt.CreatedAt > *filter.Until {
continue
}

_ = evt.Sign(entity.PrivateKey)

if !filter.Matches(&evt) {
continue
}

if evt.CreatedAt > nostr.Timestamp(int64(last)) {
last = uint32(evt.CreatedAt)
}

parsedEvents = append(parsedEvents, evt)
if relayInstance.ReplayToRelays {
eventsToReplay = append(eventsToReplay, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey})
}
}

relayInstance.lastEmitted.Store(entity.URL, last)
}
}

relayInstance.AttemptReplayEvents(eventsToReplay)

return parsedEvents, nil
return r.store
}

func (r *Relay) InjectEvents() chan nostr.Event {
Expand Down
51 changes: 51 additions & 0 deletions cmd/rsslay/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"github.com/nbd-wtf/go-nostr"
"github.com/piraces/rsslay/pkg/metrics"
"github.com/piraces/rsslay/pkg/new/app"
nostrdomain "github.com/piraces/rsslay/pkg/new/domain/nostr"
"github.com/pkg/errors"
)

type store struct {
app app.App
}

func newStore(app app.App) *store {
return &store{app: app}
}

func (b store) Init() error {
return nil
}

func (b store) SaveEvent(_ *nostr.Event) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we don't accept any events")
}

func (b store) DeleteEvent(_, _ string) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we can't delete any events")
}

func (b store) QueryEvents(libfilter *nostr.Filter) ([]nostr.Event, error) {
metrics.QueryEventsRequests.Inc()

filter := nostrdomain.NewFilter(libfilter)
events, err := b.app.GetEvents.Handle(filter)
if err != nil {
return nil, errors.Wrap(err, "error getting events")
}

return b.toEvents(events), nil
}

func (b store) toEvents(events []nostrdomain.Event) []nostr.Event {
var result []nostr.Event
for _, event := range events {
result = append(result, event.Libevent())
}
return result
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ require (
github.com/gorilla/css v1.0.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hellofresh/health-go/v5 v5.3.0 h1:T0tapAAuqVIiagRn0YQzFoIPAQek120/vQYPxpMMJ9M=
Expand Down
Loading