Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions streaming/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,16 @@ type (
)

// newReader creates a new reader.
func newReader(ctx context.Context, stream *Stream, opts ...options.Reader) (*Reader, error) {
func newReader(stream *Stream, opts ...options.Reader) (*Reader, error) {
o := options.ParseReaderOptions(opts...)
var eventFilter eventFilterFunc
if o.Topic != "" {
eventFilter = func(e *Event) bool { return e.Topic == o.Topic }
} else if o.TopicPattern != "" {
topicPatternRegexp := regexp.MustCompile(o.TopicPattern)
topicPatternRegexp, err := regexp.Compile(o.TopicPattern)
if err != nil {
return nil, fmt.Errorf("topic pattern must be a valid regex: %w", err)
}
eventFilter = func(e *Event) bool { return topicPatternRegexp.MatchString(e.Topic) }
}

Expand Down
8 changes: 6 additions & 2 deletions streaming/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ func TestNewReader(t *testing.T) {
assert.NoError(t, err)
reader, err := s.NewReader(ctx, options.WithReaderBlockDuration(testBlockDuration))
assert.NoError(t, err)
assert.NotNil(t, reader)
defer cleanupReader(t, ctx, s, reader)
if assert.NotNil(t, reader) {
defer cleanupReader(t, ctx, s, reader)
}

_, err = s.NewReader(ctx, options.WithReaderTopicPattern("("))
assert.EqualError(t, err, "topic pattern must be a valid regex: error parsing regexp: missing closing ): `(`")
}

func TestReaderReadOnce(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion streaming/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ func newSink(ctx context.Context, name string, stream *Stream, opts ...options.S
if o.Topic != "" {
eventMatcher = func(e *Event) bool { return e.Topic == o.Topic }
} else if o.TopicPattern != "" {
topicPatternRegexp := regexp.MustCompile(o.TopicPattern)
topicPatternRegexp, err := regexp.Compile(o.TopicPattern)
if err != nil {
return nil, fmt.Errorf("topic pattern must be a valid regex: %w", err)
}
eventMatcher = func(e *Event) bool { return topicPatternRegexp.MatchString(e.Topic) }
}

Expand Down
8 changes: 6 additions & 2 deletions streaming/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ func TestNewSink(t *testing.T) {
assert.NoError(t, err)
sink, err := s.NewSink(ctx, "sink", options.WithSinkBlockDuration(testBlockDuration))
assert.NoError(t, err)
assert.NotNil(t, sink)
cleanupSink(t, ctx, s, sink)
if assert.NotNil(t, sink) {
defer cleanupSink(t, ctx, s, sink)
}

_, err = s.NewSink(ctx, "sink", options.WithSinkTopicPattern("("))
assert.EqualError(t, err, "topic pattern must be a valid regex: error parsing regexp: missing closing ): `(`")
}

func TestReadOnce(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions streaming/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ func NewStream(name string, rdb *redis.Client, opts ...options.Stream) (*Stream,
// - from the event added on or after the timestamp provided via
// WithReaderStartAt if still in the stream, oldest event otherwise
func (s *Stream) NewReader(ctx context.Context, opts ...options.Reader) (*Reader, error) {
reader, err := newReader(ctx, s, opts...)
reader, err := newReader(s, opts...)
if err != nil {
err := fmt.Errorf("failed to create reader: %w", err)
s.logger.Error(err)
s.logger.Error(fmt.Errorf("failed to create reader: %w", err))
return nil, err
}
s.logger.Info("create reader", "start", reader.startID)
Expand Down
10 changes: 7 additions & 3 deletions testing/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"github.com/stretchr/testify/require"
)

// redisPwd is the default test redis password, overridden by REDIS_PASSWORD env var
var redisPwd = "redispassword"
var (
// redisPwd is the default test redis password, overridden by REDIS_PASSWORD env var
redisPwd = "redispassword"
// streamRegexp is a regular expression that matches valid stream keys
streamRegexp = regexp.MustCompile(`^pulse:stream:[^:]+:node:.*`)
)

func init() {
if p := os.Getenv("REDIS_PASSWORD"); p != "" {
Expand Down Expand Up @@ -45,7 +49,7 @@ func CleanupRedis(t *testing.T, rdb *redis.Client, checkClean bool, testName str
// Sinks content is cleaned up asynchronously, so ignore it
continue
}
if regexp.MustCompile(`^pulse:stream:[^:]+:node:.*`).MatchString(k) {
if streamRegexp.MatchString(k) {
// Node streams are cleaned up asynchronously, so ignore them
continue
}
Expand Down
Loading