Skip to content

Commit e3aec05

Browse files
committed
Main scaffolding
1 parent 44f1b35 commit e3aec05

File tree

6 files changed

+271
-0
lines changed

6 files changed

+271
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
.AppleDesktop
66
.AppleDouble
77
atlassian-ide-plugin.xml
8+
bin
89
cmake-build-*/
910
.com.apple.timemachine.donotpresent
1011
com_crashlytics_export_strings.xml
@@ -21,6 +22,7 @@ fabric.properties
2122
.fseventsd
2223
.fuse_hidden*
2324
gen
25+
go.sum
2426
Icon
2527
.idea
2628
.idea_modules/

Makefile

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
aivenator:
2+
go build -o bin/aivenator cmd/aivenator/*.go
3+
4+
test:
5+
go test ./... -count=1

cmd/aivenator/main.go

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"strings"
9+
"syscall"
10+
"time"
11+
12+
aivenatormetrics "github.com/nais/aivenator/pkg/metrics"
13+
"github.com/nais/liberator/pkg/apis/kafka.nais.io/v1"
14+
log "github.com/sirupsen/logrus"
15+
flag "github.com/spf13/pflag"
16+
"github.com/spf13/viper"
17+
"k8s.io/apimachinery/pkg/runtime"
18+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
19+
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
20+
ctrl "sigs.k8s.io/controller-runtime"
21+
"sigs.k8s.io/controller-runtime/pkg/manager"
22+
"sigs.k8s.io/controller-runtime/pkg/metrics"
23+
// +kubebuilder:scaffold:imports
24+
)
25+
26+
var scheme = runtime.NewScheme()
27+
28+
type QuitChannel chan error
29+
30+
const (
31+
ExitOK = iota
32+
ExitController
33+
ExitConfig
34+
ExitRuntime
35+
)
36+
37+
// Configuration options
38+
const (
39+
AivenToken = "aiven-token"
40+
KubernetesWriteRetryInterval = "kubernetes-write-retry-interval"
41+
LogFormat = "log-format"
42+
MetricsAddress = "metrics-address"
43+
Projects = "projects"
44+
SyncPeriod = "sync-period"
45+
)
46+
47+
const (
48+
LogFormatJSON = "json"
49+
LogFormatText = "text"
50+
)
51+
52+
func init() {
53+
54+
// Automatically read configuration options from environment variables.
55+
// i.e. --aiven-token will be configurable using AIVENATOR_AIVEN_TOKEN.
56+
viper.SetEnvPrefix("AIVENATOR")
57+
viper.AutomaticEnv()
58+
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_"))
59+
60+
flag.String(AivenToken, "", "Administrator credentials for Aiven")
61+
flag.String(MetricsAddress, "127.0.0.1:8080", "The address the metric endpoint binds to.")
62+
flag.String(LogFormat, "text", "Log format, either 'text' or 'json'")
63+
flag.Duration(KubernetesWriteRetryInterval, time.Second*10, "Requeueing interval when Kubernetes writes fail")
64+
flag.Duration(SyncPeriod, time.Hour*1, "How often to re-synchronize all AivenApplication resources including credential rotation")
65+
flag.StringSlice(Projects, []string{"nav-integration-test"}, "List of projects allowed to operate on")
66+
67+
flag.Parse()
68+
69+
err := viper.BindPFlags(flag.CommandLine)
70+
if err != nil {
71+
panic(err)
72+
}
73+
}
74+
75+
func formatter(logFormat string) (log.Formatter, error) {
76+
switch logFormat {
77+
case LogFormatJSON:
78+
return &log.JSONFormatter{
79+
TimestampFormat: time.RFC3339Nano,
80+
DisableHTMLEscape: true,
81+
}, nil
82+
case LogFormatText:
83+
return &log.TextFormatter{
84+
FullTimestamp: true,
85+
TimestampFormat: time.RFC3339Nano,
86+
}, nil
87+
}
88+
return nil, fmt.Errorf("unsupported log format '%s'", logFormat)
89+
}
90+
91+
func main() {
92+
quit := make(QuitChannel, 1)
93+
signals := make(chan os.Signal, 1)
94+
95+
logger := log.New()
96+
logfmt, err := formatter(viper.GetString(LogFormat))
97+
if err != nil {
98+
logger.Error(err)
99+
os.Exit(ExitConfig)
100+
}
101+
102+
logger.SetFormatter(logfmt)
103+
104+
syncPeriod := viper.GetDuration(SyncPeriod)
105+
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
106+
SyncPeriod: &syncPeriod,
107+
Scheme: scheme,
108+
MetricsBindAddress: viper.GetString(MetricsAddress),
109+
})
110+
111+
if err != nil {
112+
logger.Println(err)
113+
os.Exit(ExitController)
114+
}
115+
116+
logger.Info("Aivenator running")
117+
118+
go credentials(quit, logger, mgr)
119+
120+
go janitor(quit, logger, mgr)
121+
122+
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
123+
124+
go func() {
125+
for {
126+
select {
127+
case err := <-quit:
128+
logger.Errorf("terminating unexpectedly: %s", err)
129+
os.Exit(ExitRuntime)
130+
case sig := <-signals:
131+
logger.Infof("exiting due to signal: %s", strings.ToUpper(sig.String()))
132+
os.Exit(ExitOK)
133+
}
134+
}
135+
}()
136+
137+
terminator := context.Background()
138+
if err := mgr.Start(terminator); err != nil {
139+
quit <- fmt.Errorf("manager stopped unexpectedly: %s", err)
140+
return
141+
}
142+
143+
quit <- fmt.Errorf("manager has stopped")
144+
}
145+
146+
// TODO: Finds secrets that are no longer in use and cleans up associated service user before deleting secret
147+
func janitor(quit QuitChannel, logger *log.Logger, mgr manager.Manager) {
148+
149+
}
150+
151+
// TODO: Reconcile AivenApplications
152+
func credentials(quit QuitChannel, logger *log.Logger, mgr manager.Manager) {
153+
154+
}
155+
156+
func init() {
157+
err := clientgoscheme.AddToScheme(scheme)
158+
if err != nil {
159+
panic(err)
160+
}
161+
162+
err = kafka_nais_io_v1.AddToScheme(scheme)
163+
if err != nil {
164+
panic(err)
165+
}
166+
167+
aivenatormetrics.Register(metrics.Registry)
168+
// +kubebuilder:scaffold:scheme
169+
}

go.mod

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
module github.com/nais/aivenator
2+
3+
go 1.15
4+
5+
require (
6+
github.com/aiven/aiven-go-client v1.5.11
7+
github.com/nais/liberator v0.0.0-20210421120235-5f3edcf81f86
8+
github.com/prometheus/client_golang v1.7.1
9+
github.com/sirupsen/logrus v1.8.1
10+
github.com/spf13/pflag v1.0.5
11+
github.com/spf13/viper v1.7.1
12+
k8s.io/apimachinery v0.21.0
13+
k8s.io/client-go v0.20.2
14+
sigs.k8s.io/controller-runtime v0.8.3
15+
)
16+
17+
replace github.com/nais/liberator => ../liberator

main.go

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package aivenator

pkg/metrics/metrics.go

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package metrics
2+
3+
import (
4+
"strconv"
5+
"time"
6+
7+
"github.com/aiven/aiven-go-client"
8+
"github.com/prometheus/client_golang/prometheus"
9+
)
10+
11+
const (
12+
Namespace = "aivenator"
13+
14+
LabelAivenOperation = "operation"
15+
LabelNamespace = "namespace"
16+
LabelPool = "pool"
17+
LabelResourceType = "resource_type"
18+
LabelStatus = "status"
19+
LabelSyncState = "synchronization_state"
20+
)
21+
22+
var (
23+
TopicsProcessed = prometheus.NewCounterVec(prometheus.CounterOpts{
24+
Name: "aiven_applications_processed",
25+
Namespace: Namespace,
26+
Help: "number of applications synchronized with aiven",
27+
}, []string{LabelSyncState, LabelPool})
28+
29+
ServiceUsers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
30+
Name: "service_users",
31+
Namespace: Namespace,
32+
Help: "number of service users",
33+
}, []string{LabelPool})
34+
35+
AivenLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
36+
Name: "aiven_latency",
37+
Namespace: Namespace,
38+
Help: "latency in aiven api operations",
39+
Buckets: prometheus.LinearBuckets(0.05, 0.05, 100),
40+
}, []string{LabelAivenOperation, LabelStatus, LabelPool})
41+
42+
KubernetesResourcesWritten = prometheus.NewCounterVec(prometheus.CounterOpts{
43+
Name: "kubernetes_resources_written",
44+
Namespace: Namespace,
45+
Help: "number of kubernetes resources written to the cluster",
46+
}, []string{LabelNamespace, LabelResourceType})
47+
)
48+
49+
func ObserveAivenLatency(operation, pool string, fun func() error) error {
50+
timer := time.Now()
51+
err := fun()
52+
used := time.Now().Sub(timer)
53+
status := 200
54+
if err != nil {
55+
aivenErr, ok := err.(aiven.Error)
56+
if ok {
57+
status = aivenErr.Status
58+
} else {
59+
status = 0
60+
}
61+
}
62+
AivenLatency.With(prometheus.Labels{
63+
LabelAivenOperation: operation,
64+
LabelPool: pool,
65+
LabelStatus: strconv.Itoa(status),
66+
}).Observe(used.Seconds())
67+
return err
68+
}
69+
70+
func Register(registry prometheus.Registerer) {
71+
registry.MustRegister(
72+
AivenLatency,
73+
KubernetesResourcesWritten,
74+
ServiceUsers,
75+
TopicsProcessed,
76+
)
77+
}

0 commit comments

Comments
 (0)