Skip to content

Commit

Permalink
update to use new event handling (#109)
Browse files Browse the repository at this point in the history
* update to use new event handling

This adds the new x/events handling.

Note:

There are some issues with detecting Ack/Nack's on messages.
Which has resulted in some tests not being able to be done that check
for Nacked messages.

Signed-off-by: Mike Mason <[email protected]>

* update chart

Signed-off-by: Mike Mason <[email protected]>

* update eventtools to support waiting for acks

This enables us to run all tests which check for specific acks.

Signed-off-by: Mike Mason <[email protected]>

* work through review feedback

Signed-off-by: Mike Mason <[email protected]>

* ignore unrelated ids

This ignores unrelated ids instead of failing if an id is found that
isn't part of the schema.

Signed-off-by: Mike Mason <[email protected]>

* cache prefix id map for quicker lookups

Signed-off-by: Mike Mason <[email protected]>

* no longer tenant specific

Signed-off-by: Mike Mason <[email protected]>

* ensure consistent logging field names

Signed-off-by: Mike Mason <[email protected]>

* use latest x package with updated events testing

Signed-off-by: Mike Mason <[email protected]>

---------

Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm authored Jun 19, 2023
1 parent 975b26d commit 4651abd
Show file tree
Hide file tree
Showing 16 changed files with 661 additions and 641 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ template "common.names.fullname" . }}
name: {{ template "common.names.fullname" . }}-server
namespace: {{ .Release.Namespace }}
labels:
service: server
{{- include "common.labels.standard" . | nindent 4 }}
{{- with .Values.deployment.extraLabels }}
{{- toYaml . | nindent 4 }}
Expand All @@ -13,14 +14,16 @@ metadata:
{{ toYaml . | nindent 4 }}
{{- end }}
spec:
replicas: {{ .Values.deployment.replicas }}
replicas: {{ .Values.deployment.serverReplicas }}
revisionHistoryLimit: 3
selector:
matchLabels:
service: server
{{- include "common.labels.matchLabels" . | nindent 6 }}
template:
metadata:
labels:
service: server
{{- include "common.labels.standard" . | nindent 8 }}
{{- with .Values.deployment.extraLabels }}
{{- toYaml . | nindent 8 }}
Expand Down Expand Up @@ -53,12 +56,20 @@ spec:
- name: PERMISSIONSAPI_SERVER_TRUSTED_PROXIES
value: "{{ join " " . }}"
{{- end }}
- name: PERMISSIONSAPI_OIDC_ISSUER
value: "{{ .Values.config.oidc.issuer }}"
- name: PERMISSIONSAPI_OIDC_JWKSURI
value: "{{ .Values.config.oidc.jwksuri }}"
{{- if .Values.config.oidc.issuer }}
{{- with .Values.config.oidc.audience }}
- name: PERMISSIONSAPI_OIDC_AUDIENCE
value: "{{ .Values.config.oidc.audience }}"
value: "{{ . }}"
{{- end }}
{{- with .Values.config.oidc.issuer }}
- name: PERMISSIONSAPI_OIDC_ISSUER
value: "{{ . }}"
{{- end }}
{{- with .Values.config.oidc.refreshTimeout }}
- name: PERMISSIONSAPI_OIDC_REFRESH_TIMEOUT
value: "{{ . }}"
{{- end }}
{{- end }}
- name: PERMISSIONSAPI_SPICEDB_ENDPOINT
value: "{{ .Values.config.spicedb.endpoint }}"
- name: PERMISSIONSAPI_SPICEDB_INSECURE
Expand Down
166 changes: 166 additions & 0 deletions chart/permissions-api/templates/deployment-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ template "common.names.fullname" . }}-worker
namespace: {{ .Release.Namespace }}
labels:
service: worker
{{- include "common.labels.standard" . | nindent 4 }}
{{- with .Values.deployment.extraLabels }}
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.deployment.annotations }}
annotations:
{{ toYaml . | nindent 4 }}
{{- end }}
spec:
replicas: {{ .Values.deployment.workerReplicas }}
revisionHistoryLimit: 3
selector:
matchLabels:
service: worker
{{- include "common.labels.matchLabels" . | nindent 6 }}
template:
metadata:
labels:
service: worker
{{- include "common.labels.standard" . | nindent 8 }}
{{- with .Values.deployment.extraLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.deployment.annotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 30
{{- with .Values.deployment.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.deployment.podSecurityContext }}
securityContext:
{{- toYaml .Values.deployment.podSecurityContext | nindent 8 }}
{{- end }}
containers:
- name: {{ include "common.names.name" . }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- worker
env:
- name: PERMISSIONSAPI_SERVER_LISTEN
value: ":{{ include "permapi.listenPort" . }}"
- name: PERMISSIONSAPI_SERVER_SHUTDOWN_GRACE_PERIOD
value: "{{ .Values.config.server.shutdownGracePeriod }}"
{{- with .Values.config.server.trustedProxies }}
- name: PERMISSIONSAPI_SERVER_TRUSTED_PROXIES
value: "{{ join " " . }}"
{{- end }}
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_URL
value: "{{ .Values.config.events.url }}"
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_TIMEOUT
value: "{{ .Values.config.events.timeout }}"
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_PREFIX
value: "{{ .Values.config.events.prefix }}"
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_QUEUEGROUP
value: "{{ .Values.config.events.queueGroup }}"
{{- with .Values.config.events.topics }}
- name: PERMISSIONSAPI_EVENTS_TOPICS
value: "{{ join " " . }}"
{{- end }}
{{- if .Values.config.events.nats.credsSecretName }}
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_NATS_CREDSFILE
value: "{{ .Values.config.events.nats.credsFile }}"
{{- end }}
{{- if .Values.config.events.nats.token }}
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_NATS_TOKEN
value: "{{ .Values.config.events.token }}"
{{- end }}
{{- if .Values.config.oidc.issuer }}
{{- with .Values.config.oidc.audience }}
- name: PERMISSIONSAPI_OIDC_AUDIENCE
value: "{{ . }}"
{{- end }}
{{- with .Values.config.oidc.issuer }}
- name: PERMISSIONSAPI_OIDC_ISSUER
value: "{{ . }}"
{{- end }}
{{- with .Values.config.oidc.refreshTimeout }}
- name: PERMISSIONSAPI_OIDC_REFRESH_TIMEOUT
value: "{{ . }}"
{{- end }}
{{- end }}
- name: PERMISSIONSAPI_SPICEDB_ENDPOINT
value: "{{ .Values.config.spicedb.endpoint }}"
- name: PERMISSIONSAPI_SPICEDB_INSECURE
value: "{{ .Values.config.spicedb.insecure }}"
- name: PERMISSIONSAPI_SPICEDB_VERIFYCA
value: "{{ .Values.config.spicedb.verifyCA }}"
- name: PERMISSIONSAPI_TRACING_ENABLED
value: "{{ .Values.config.tracing.enabled }}"
- name: PERMISSIONSAPI_TRACING_PROVIDER
value: "{{ .Values.config.tracing.provider }}"
- name: PERMISSIONSAPI_TRACING_ENVIRONMENT
value: "{{ .Values.config.tracing.environment }}"
{{- if .Values.config.spicedb.caSecretName }}
- name: SSL_CERT_DIR
value: "/etc/ssl/spicedb"
{{- end }}
{{- if eq .Values.config.tracing.provider "jaeger" }}
- name: PERMISSIONSAPI_TRACING_JAEGER_ENDPOINT
value: "{{ .Values.config.tracing.jaeger.endpoint }}"
- name: PERMISSIONSAPI_TRACING_JAEGER_USER
value: "{{ .Values.config.tracing.jaeger.user }}"
- name: PERMISSIONSAPI_TRACING_JAEGER_PASSWORD
value: "{{ .Values.config.tracing.jaeger.password }}"
{{- end }}
{{- if eq .Values.config.tracing.provider "otlp" }}
- name: PERMISSIONSAPI_TRACING_OTLP_ENDPOINT
value: "{{ .Values.config.tracing.otlp.endpoint }}"
- name: PERMISSIONSAPI_TRACING_OTLP_INSECURE
value: "{{ .Values.config.tracing.otlp.insecure }}"
- name: PERMISSIONSAPI_TRACING_OTLP_CERTIFICATE
value: "{{ .Values.config.tracing.otlp.certificate }}"
{{- end }}
envFrom:
- secretRef:
name: "{{ .Values.config.spicedb.pskSecretName }}"
ports:
- name: http
containerPort: {{ include "permapi.listenPort" . }}
protocol: TCP
livenessProbe:
httpGet:
path: /livez
port: http
readinessProbe:
httpGet:
path: /readyz
port: http
{{- with .Values.deployment.resources }}
resources:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.deployment.nodeSelector }}
volumeMounts:
- name: spicedb-ca
mountPath: /etc/ssl/spicedb/
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.deployment.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.deployment.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.config.spicedb.caSecretName }}
volumes:
- name: spicedb-ca
secret:
secretName: {{ . }}
{{- end }}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "common.names.fullname" . }}
name: {{ include "common.names.fullname" . }}-server
namespace: {{ .Release.Namespace }}
labels:
service: server
{{- include "common.labels.standard" . | nindent 4 }}
{{- with .Values.service.annotations }}
annotations:
Expand All @@ -17,4 +18,5 @@ spec:
protocol: TCP
name: http
selector:
service: server
{{- include "common.labels.matchLabels" . | nindent 4 }}
31 changes: 27 additions & 4 deletions chart/permissions-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ config:
oidc:
# issuer is the OIDC issuer to use when validating Bearer JWTs
issuer: ""
# jwksuri is the URI containing the JWKS for the Bearer JWTs' signing keys
jwksuri: ""
# audience is the audience to validate when accepting requests
audience: ""
# refreshTimeout is the limit a refresh of jwks can take
refreshTimeout: ""
spicedb:
# endpoint is the address of the SpiceDB cluster containing permissions-api data
endpoint: ""
Expand All @@ -35,6 +35,27 @@ config:
caSecretName: ""
# pskSecretName is the name of the secret containing the pre-shared key for connecting to SpiceDB. This must contain a key, PERMISSIONSAPI_SPICEDB_KEY, that contains the PSK value
pskSecretName: ""

events:
# url is the event server connection url
url: ""
# timeout is event connection timeout
timeout: ""
# prefix is the subscribe event prefix
prefix: ""
# queueGroup defines the events queue group
queueGroup: ""
# topics are the list of topics to subscribe to
topics: []
# nats contains nats specific configuration
nats:
# token is the nats auth token
token: ""
# credsSecretName is the secret to load the creds auth file from
credsSecretName: ""
# credsFile is the location to read the creds file from
credsFile: "/nats/creds"

tracing:
# enabled is true if OpenTelemetry tracing should be enabled for permissions-api
enabled: false
Expand All @@ -54,8 +75,10 @@ config:
insecure: false

deployment:
# replicas is the number of replicas to deploy in the deployment
replicas: 1
# serverReplicas is the number of replicas to deploy in the deployment for the server container
serverReplicas: 1
# workerReplicas is the number of replicas to deploy in the deployment for the worker container
workerReplicas: 1
# imagePullSecrets is the list of image pull secrets to use when fetching the image
imagePullSecrets: []
# podSecurityContext is the pod security context for the deployment
Expand Down
31 changes: 17 additions & 14 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.infratographer.com/x/events"
"go.infratographer.com/x/otelx"
"go.infratographer.com/x/viperx"

"go.infratographer.com/permissions-api/internal/config"
"go.infratographer.com/permissions-api/internal/pubsub"
Expand All @@ -25,7 +27,10 @@ func init() {
rootCmd.AddCommand(workerCmd)

otelx.MustViperFlags(viper.GetViper(), workerCmd.Flags())
pubsub.MustViperFlags(viper.GetViper(), workerCmd.Flags())
events.MustViperFlagsForSubscriber(viper.GetViper(), workerCmd.Flags())

workerCmd.PersistentFlags().StringSlice("events-topics", []string{}, "event topics to subscribe to")
viperx.MustBindFlag(viper.GetViper(), "events.topics", workerCmd.PersistentFlags().Lookup("events-topics"))
}

func worker(ctx context.Context, cfg *config.AppConfig) {
Expand All @@ -41,20 +46,18 @@ func worker(ctx context.Context, cfg *config.AppConfig) {

engine := query.NewEngine("infratographer", spiceClient)

logger.Infow("client config", "client_config", cfg.PubSub)
subscriber, err := pubsub.NewSubscriber(ctx, cfg.Events.Subscriber, engine)
if err != nil {
logger.Fatalw("unable to initialize subscriber", "error", err)
}

client := pubsub.NewClient(
cfg.PubSub,
pubsub.WithQueryEngine(engine),
pubsub.WithResourceTypeNames(
[]string{
"tenant",
},
),
pubsub.WithLogger(logger),
)
for _, topic := range viper.GetStringSlice("events.topics") {
if err := subscriber.Subscribe(topic); err != nil {
logger.Fatalw("failed to subscribe to changes topic", "topic", topic, "error", err)
}
}

if err := client.Listen(); err != nil {
if err := subscriber.Listen(); err != nil {
logger.Fatalw("error listening for events", "error", err)
}

Expand All @@ -63,7 +66,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) {

logger.Infof("received %s signal, stopping", sig)

err = client.Stop()
err = subscriber.Close()
if err != nil {
logger.Fatalw("error stopping NATS client", "error", err)
}
Expand Down
Loading

0 comments on commit 4651abd

Please sign in to comment.