Skip to content

Commit

Permalink
Merge pull request #79 from planetary-social/more_validations
Browse files Browse the repository at this point in the history
More validations and check it before push to queue
  • Loading branch information
dcadenas authored Feb 7, 2024
2 parents 377ee26 + 5a7a7c0 commit c109f06
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/ThreeDotsLabs/watermill-googlecloud v1.0.13
github.com/boreq/errors v0.1.0
github.com/boreq/rest v0.1.0
github.com/davecgh/go-spew v1.1.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/google/wire v0.5.0
github.com/gorilla/mux v1.8.1
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
57 changes: 57 additions & 0 deletions internal/logging/log.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package logging

import (
"encoding/json"
"fmt"
"os"
"os/exec"
"reflect"
"runtime"

"github.com/davecgh/go-spew/spew"
)

const (
loggerFieldName = "name"
loggerFieldError = "error"
Expand Down Expand Up @@ -190,3 +201,49 @@ func (d devNullLoggerEntry) WithField(key string, v any) Entry {

func (d devNullLoggerEntry) Message(msg string) {
}

func Inspect(args ...interface{}) {
for _, arg := range args {
val := reflect.ValueOf(arg)

if val.Kind() == reflect.String {
fmt.Println(arg)
continue
}
// Use String() or MarshalJSON if available
if val.CanInterface() {
if marshaler, ok := arg.(json.Marshaler); ok {
if jsonBytes, err := marshaler.MarshalJSON(); err == nil {
fmt.Println(string(jsonBytes))
continue
}
}
if stringer, ok := arg.(fmt.Stringer); ok {
fmt.Println(stringer.String())
continue
}
}

spew.Dump(arg)
}
println()
}

// Debugging function for development to use voice messages to detect issues
// in the code.
var Say func(string)

func init() {
if os.Getenv("EVENTS_ENVIRONMENT") != "PRODUCTION" && runtime.GOOS == "darwin" {
Say = func(text string) {
cmd := exec.Command("say", text)
if err := cmd.Run(); err != nil {
fmt.Println("Failed to execute say command:", err)
}
}
} else {
Say = func(text string) {
// This function intentionally left blank.
}
}
}
2 changes: 1 addition & 1 deletion service/app/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ type Event interface {
Kind() domain.EventKind
Tags() []domain.EventTag
Raw() []byte
HasInvalidProfileTags() bool
IsInvalid() bool
}
5 changes: 3 additions & 2 deletions service/app/handler_process_saved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewProcessSavedEventHandler(
}
}

// This processes the internal db based event queue, the event is already in the database
func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedEvent) (err error) {
defer h.metrics.StartApplicationCall("processSavedEvent").End(&err)

Expand Down Expand Up @@ -193,7 +194,7 @@ func (h *ProcessSavedEventHandler) maybeSendEventToRelay(ctx context.Context, ev
}

func (h *ProcessSavedEventHandler) shouldDisregardSendEventErr(err error) bool {
return errors.Is(err, relays.ErrEventReplaced)
return errors.Is(err, relays.ErrEventReplaced) || errors.Is(err, relays.ErrEventInvalid)
}

func ShouldSendEventToRelay(event Event) bool {
Expand All @@ -206,7 +207,7 @@ func ShouldSendEventToRelay(event Event) bool {
return false
}

if event.HasInvalidProfileTags() {
if event.IsInvalid() {
return false
}

Expand Down
3 changes: 3 additions & 0 deletions service/app/handler_save_received_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func NewSaveReceivedEventHandler(
}
}

// This handler is responsible for saving received events. It checks if the
// event should be saved and if so, it saves it and publishes the id to the
// internal db based event queue.
func (h *SaveReceivedEventHandler) Handle(ctx context.Context, cmd SaveReceivedEvent) (err error) {
defer h.metrics.StartApplicationCall("saveReceivedEvent").End(&err)

Expand Down
13 changes: 13 additions & 0 deletions service/domain/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (d *RelayDownloader) performTask(task Task) {
// Run the filter specified by the task and for each event found publish it to all subscribers.
func (d *RelayDownloader) performTaskWithErr(task Task) error {
ch, err := d.relayConnections.GetEvents(task.Ctx(), d.address, task.Filter())
filterJson, _ := task.Filter().MarshalJSON()
if err != nil {
return errors.Wrap(err, "error getting events ch")
}
Expand All @@ -309,6 +310,18 @@ func (d *RelayDownloader) performTaskWithErr(task Task) error {
if eventOrEOSE.EOSE() {
task.OnReceivedEOSE()
} else {
event := eventOrEOSE.Event()

// Don't event push invalid events to the queue
if event.IsInvalid() {
d.logger.
Trace().
WithField("event", event).
WithField("address", d.address.String()).
WithField("filter", string(filterJson)).
Message("invalid event, skipping")
continue
}
d.metrics.ReportReceivedEvent(d.address)
d.receivedEventPublisher.Publish(d.address, eventOrEOSE.Event())
}
Expand Down
47 changes: 47 additions & 0 deletions service/domain/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package domain

import (
"encoding/json"
"net/url"
"time"

"github.com/boreq/errors"
Expand Down Expand Up @@ -55,9 +56,16 @@ func (u UnverifiedEvent) Tags() []EventTag {
return internal.CopySlice(u.event.tags)
}

func (u UnverifiedEvent) IsInvalid() bool {
return u.event.IsInvalid()
}

func (u UnverifiedEvent) HasInvalidProfileTags() bool {
return u.event.HasInvalidProfileTags()
}
func (u UnverifiedEvent) HasInvalidRTags() bool {
return u.event.HasInvalidRTags()
}

func (u UnverifiedEvent) Raw() []byte {
j, err := u.event.libevent.MarshalJSON()
Expand Down Expand Up @@ -128,10 +136,18 @@ func (e Event) Tags() []EventTag {
return internal.CopySlice(e.event.tags)
}

func (e Event) IsInvalid() bool {
return e.event.IsInvalid()
}

func (e Event) HasInvalidProfileTags() bool {
return e.event.HasInvalidProfileTags()
}

func (e Event) HasInvalidRTags() bool {
return e.event.HasInvalidRTags()
}

func (e Event) Content() string {
return e.event.content
}
Expand Down Expand Up @@ -226,3 +242,34 @@ func (e parsedEvent) HasInvalidProfileTags() bool {
}
return false
}

func (e parsedEvent) HasInvalidRTags() bool {
for _, tag := range e.tags {
if !tag.IsRelay() {
continue
}

relay, err := tag.Relay()
if err != nil {
return true
}

if len(relay.s) > 2000 {
return true
}

parsed, parseErr := url.Parse(relay.s)
if parseErr != nil {
return true
}

if parsed.Scheme != "wss" && parsed.Scheme != "ws" {
return true
}
}
return false
}

func (e parsedEvent) IsInvalid() bool {
return e.HasInvalidProfileTags() || e.HasInvalidRTags()
}
44 changes: 44 additions & 0 deletions service/domain/event_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package domain_test

import (
"strings"
"testing"

"github.com/planetary-social/nos-event-service/internal/fixtures"
Expand Down Expand Up @@ -53,3 +54,46 @@ func TestEvent_HasInvalidProfileTags(t *testing.T) {
})
}
}
func TestEvent_HasInvalidRTags(t *testing.T) {
largeString := strings.Repeat("a", 2000)

testCases := []struct {
Name string
Event domain.Event
Result bool
}{
{
Name: "valid_single_relay",
Event: fixtures.Event(fixtures.SomeEventKind(), []domain.EventTag{domain.MustNewEventTag([]string{"r", "wss://example-relay.com"})}, fixtures.SomeString()),
Result: false,
},
{
Name: "valid_multiple_relays",
Event: fixtures.Event(
fixtures.SomeEventKind(),
[]domain.EventTag{
domain.MustNewEventTag([]string{"r", "wss://relay1.com"}),
domain.MustNewEventTag([]string{"r", "wss://relay2.com", "read"}),
domain.MustNewEventTag([]string{"r", "wss://relay3.com", "write"}),
},
fixtures.SomeString(),
),
Result: false,
},
{
Name: "invalid_concatenated_relays",
Event: fixtures.Event(
fixtures.SomeEventKind(),
[]domain.EventTag{domain.MustNewEventTag([]string{"r", "wss://foobar.com" + largeString})},
fixtures.SomeString(),
),
Result: true,
},
}

for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
require.Equal(t, testCase.Result, testCase.Event.HasInvalidRTags())
})
}
}
23 changes: 15 additions & 8 deletions service/domain/relays/event_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package relays

import (
"context"
"strings"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/service/domain"
)

var ErrEventReplaced = errors.New("relay has a newer event which replaced this event")
var ErrEventInvalid = errors.New("invalid event from relay")

type EventSender struct {
connections *RelayConnections
Expand All @@ -27,13 +29,18 @@ func (s *EventSender) SendEvent(ctx context.Context, address domain.RelayAddress

func (s *EventSender) maybeConvertError(err error) error {
var okResponseErr OKResponseError
if errors.As(err, &okResponseErr) {
switch okResponseErr.Reason() {
case "replaced: have newer event":
return ErrEventReplaced
default:
return err
}
if !errors.As(err, &okResponseErr) {
return err
}

reason := okResponseErr.Reason()

switch {
case reason == "replaced: have newer event":
return ErrEventReplaced
case strings.HasPrefix(reason, "invalid: "):
return ErrEventInvalid
default:
return err
}
return err
}

0 comments on commit c109f06

Please sign in to comment.