Skip to content

Commit

Permalink
Merge pull request #247 from practo/multiple-svv-views
Browse files Browse the repository at this point in the history
Throttling support for all redshift databases
  • Loading branch information
alok87 authored Jul 29, 2021
2 parents c1947e4 + f2ffb47 commit aeae0df
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 15 deletions.
1 change: 1 addition & 0 deletions REDSHIFTSINK.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ CREATE SCHEMA redshiftsink_operator;
Please change the below two in the SQL below, before running it to create the view. View [source](https://github.com/awslabs/amazon-redshift-utils/blob/184c2ba7fd9d497027a831ca72e08fe09e79fd0b/src/AdminViews/v_get_tbl_scan_frequency.sql)
1. `AND s.userid != 100` with the user id(s) of the redshiftsink user
2. `AND s.starttime > GETDATE() - interval '3 day'` with the time window you want to consider a table is in use or not.
3. Please create the following view for all the databases in Redshift you need. Then, add the list of these databases in the operator flag `--databases=`. This is required so that the operator queries the view for all the databases.

```sql
CREATE OR REPLACE VIEW redshiftsink_operator.scan_query_total AS
Expand Down
45 changes: 37 additions & 8 deletions cmd/redshiftsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,27 @@ func init() {
// +kubebuilder:scaffold:scheme
}

func parseDatabase(databases string) []*string {
var dbs []*string

if databases != "" {
supplied := strings.Split(databases, ",")
for i, _ := range supplied {
dbs = append(dbs, &supplied[i]) // use supplied dbs
}
} else {
dbs = append(dbs, nil) // use default db from config
}

return dbs
}

func main() {
rand.Seed(time.Now().UnixNano())

var enableLeaderElection, collectRedshiftMetrics bool
var batcherImage, loaderImage, secretRefName, secretRefNamespace, kafkaVersion, metricsAddr, allowedRsks, prometheusURL string
var batcherImage, loaderImage, secretRefName, secretRefNamespace string
var kafkaVersion, metricsAddr, allowedRsks, prometheusURL, databases string
var redshiftMaxOpenConns, redshiftMaxIdleConns int
flag.StringVar(&batcherImage, "default-batcher-image", "practodev/redshiftbatcher:v1.0.0-beta.1", "image to use for the redshiftbatcher")
flag.StringVar(&loaderImage, "default-loader-image", "practodev/redshiftloader:v1.0.0-beta.1", "image to use for the redshiftloader")
Expand All @@ -72,6 +88,7 @@ func main() {
flag.IntVar(&redshiftMaxIdleConns, "default-redshift-max-idle-conns", 2, "the maximum number of idle connections allowed to redshift per redshiftsink resource")
flag.StringVar(&allowedRsks, "allowed-rsks", "", "comma separated list of names of rsk resources to allow, if empty all rsk resources are allowed")
flag.StringVar(&prometheusURL, "prometheus-url", "", "optional, giving prometheus makes the operator enable new features using time series data. Features: loader throttling, resetting offsets of 0 throughput topics.")
flag.StringVar(&databases, "databases", "", "comma separated list of all redshift databases to query for redshiftsink_operator.scan_query_total view. This is required for throttling support. Please note: the view should be manually created beforehand for all the specified databases.")
flag.Parse()

ctrl.SetLogger(klogr.New())
Expand Down Expand Up @@ -148,17 +165,29 @@ func main() {

ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
defer cancel()

setupLog.Info("Configuring Redshift exporter...")
redshiftClient, err := controllers.NewRedshiftConn(uncachedClient, secretRefName, secretRefNamespace)
if err != nil {
setupLog.Error(err, "problem initializing redshift connection")
os.Exit(1)
}
redshiftCollector := redshift.NewRedshiftCollector(redshiftClient)

// collect redshift metrics for all databases
wg := &sync.WaitGroup{}
dbs := parseDatabase(databases)
redshiftClients := []*redshift.Redshift{}
for _, database := range dbs {
client, err := controllers.NewRedshiftConn(uncachedClient,
secretRefName,
secretRefNamespace,
database,
)
if err != nil {
setupLog.Error(err, "problem initializing redshift connection")
os.Exit(1)
}
redshiftClients = append(redshiftClients, client)
}

redshiftCollector := redshift.NewRedshiftCollector(redshiftClients)
wg.Add(1)
go redshiftCollector.Fetch(ctx, wg)

metrics.Registry.MustRegister(redshiftCollector)

setupLog.Info("Starting Operator...")
Expand Down
1 change: 1 addition & 0 deletions config/operator/redshiftsink_operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ spec:
- --allowed-rsks=
- --promethus-url=
- --collect-redshift-metrics=false
- --databases=
resources:
limits:
cpu: 300m
Expand Down
4 changes: 4 additions & 0 deletions controllers/redshift_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func NewRedshiftConn(
client client.Client,
secretName,
secretNamespace string,
database *string,
) (
*redshift.Redshift,
error,
Expand All @@ -23,6 +24,9 @@ func NewRedshiftConn(
for key, value := range k8sSecret.Data {
secret[key] = string(value)
}
if database != nil {
secret["redshiftDatabase"] = *database
}

return NewRedshiftConnection(secret, "")
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/redshift/redshift_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ var (
)

type RedshiftCollector struct {
client *Redshift
clients []*Redshift
queryTotalMetric *prometheus.Desc

ready bool
queryTotal sync.Map
}

func NewRedshiftCollector(client *Redshift) *RedshiftCollector {
func NewRedshiftCollector(clients []*Redshift) *RedshiftCollector {
return &RedshiftCollector{
client: client,
clients: clients,
queryTotalMetric: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, SubSystemScan, "query_total"),
"Total number of redshift queries executed",
Expand All @@ -35,12 +35,17 @@ func NewRedshiftCollector(client *Redshift) *RedshiftCollector {
}

func (c *RedshiftCollector) updateQueryTotal(ctx context.Context) {
queryTotalRows, err := c.client.ScanQueryTotal(ctx)
if err != nil {
klog.Fatalf("Redshift Collector shutdown due to error: %v", err)
var rows []QueryTotalRow
for i, client := range c.clients {
klog.V(3).Infof("fetching query_total for database:%v", i)
dbRows, err := client.ScanQueryTotal(ctx)
if err != nil {
klog.Fatalf("Redshift Collector shutdown due to error: %v", err)
}
rows = append(rows, dbRows...)
}

c.queryTotal.Store("", queryTotalRows)
c.queryTotal.Store("", rows)
}

func (c *RedshiftCollector) Fetch(ctx context.Context, wg *sync.WaitGroup) {
Expand Down

0 comments on commit aeae0df

Please sign in to comment.