Skip to content

Commit

Permalink
update to use new event handling
Browse files Browse the repository at this point in the history
This adds the new x/events handling.

Note:

There are some issues with detecting Ack/Nack's on messages.
Which has resulted in some tests not being able to be done that check
for Nacked messages.

Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm committed Jun 16, 2023
1 parent e0e9e00 commit 0dea11d
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 593 deletions.
23 changes: 8 additions & 15 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.infratographer.com/x/events"
"go.infratographer.com/x/otelx"

"go.infratographer.com/permissions-api/internal/config"
Expand All @@ -25,7 +26,7 @@ func init() {
rootCmd.AddCommand(workerCmd)

otelx.MustViperFlags(viper.GetViper(), workerCmd.Flags())
pubsub.MustViperFlags(viper.GetViper(), workerCmd.Flags())
events.MustViperFlagsForSubscriber(viper.GetViper(), workerCmd.Flags())
}

func worker(ctx context.Context, cfg *config.AppConfig) {
Expand All @@ -41,20 +42,12 @@ func worker(ctx context.Context, cfg *config.AppConfig) {

engine := query.NewEngine("infratographer", spiceClient)

logger.Infow("client config", "client_config", cfg.PubSub)

client := pubsub.NewClient(
cfg.PubSub,
pubsub.WithQueryEngine(engine),
pubsub.WithResourceTypeNames(
[]string{
"tenant",
},
),
pubsub.WithLogger(logger),
)
subscriber, err := pubsub.NewSubscriber(ctx, cfg.Events.Subscriber, engine)
if err != nil {
logger.Fatalw("unable to initialize subscriber", "error", err)
}

if err := client.Listen(); err != nil {
if err := subscriber.Listen(); err != nil {
logger.Fatalw("error listening for events", "error", err)
}

Expand All @@ -63,7 +56,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) {

logger.Infof("received %s signal, stopping", sig)

err = client.Stop()
err = subscriber.Close()
if err != nil {
logger.Fatalw("error stopping NATS client", "error", err)
}
Expand Down
61 changes: 32 additions & 29 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@ module go.infratographer.com/permissions-api
go 1.20

require (
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/authzed/authzed-go v0.8.0
github.com/authzed/grpcutil v0.0.0-20230524151342-4caf7fd1108a
github.com/labstack/echo/v4 v4.10.2
github.com/nats-io/nats-server/v2 v2.9.17
github.com/nats-io/nats.go v1.24.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.3
go.infratographer.com/x v0.0.15
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.40.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
github.com/spf13/viper v1.16.0
github.com/stretchr/testify v1.8.4
go.infratographer.com/x v0.3.1
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
google.golang.org/grpc v1.55.0
)

require (
github.com/MicahParks/keyfunc/v2 v2.0.3 // indirect
github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand All @@ -32,11 +32,13 @@ require (
github.com/envoyproxy/protoc-gen-validate v0.10.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/garsue/watermillzap v1.2.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -45,9 +47,10 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/jzelinskie/stringz v0.0.0-20210414224931-d6a8ce844a70 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/labstack/echo-contrib v0.14.1 // indirect
github.com/labstack/echo-contrib v0.15.0 // indirect
github.com/labstack/echo-jwt/v4 v4.2.0 // indirect
github.com/labstack/gommon v0.4.0 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
Expand All @@ -57,45 +60,45 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nats-server/v2 v2.9.17 // indirect
github.com/nats-io/nats.go v1.26.0 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.40.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.40.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 0dea11d

Please sign in to comment.