diff --git a/rds/collector.go b/rds/collector.go index 25af4d5..e2416b9 100644 --- a/rds/collector.go +++ b/rds/collector.go @@ -68,19 +68,17 @@ type DbCollector interface { } type Collector struct { - sess *session.Session - region string - instance rds.DBInstance - ip *net.IPAddr - - cloudWatchLogsApi *cloudwatchlogs.CloudWatchLogs - - dbCollector DbCollector - - logReader *LogReader - logParser *logparser.Parser - - logger logger.Logger + sess *session.Session + region string + instance rds.DBInstance + ip *net.IPAddr + cloudWatchLogsApi *cloudwatchlogs.CloudWatchLogs + dbCollector DbCollector + logReader *LogReader + logParser *logparser.Parser + logger logger.Logger + metricsChan chan prometheus.Metric + done chan struct{} } func NewCollector(sess *session.Session, i *rds.DBInstance) (*Collector, error) { @@ -171,71 +169,87 @@ func (c *Collector) Close() { } func (c *Collector) Collect(ch chan<- prometheus.Metric) { - i := c.instance - - ch <- utils.Gauge(dStatus, 1, aws.StringValue(i.DBInstanceStatus)) - - ch <- utils.Gauge(dInfo, 1, - c.region, - aws.StringValue(i.AvailabilityZone), - - aws.StringValue(i.Endpoint.Address), - c.ip.String(), - strconv.Itoa(int(aws.Int64Value(i.Endpoint.Port))), - - aws.StringValue(i.Engine), - aws.StringValue(i.EngineVersion), - - aws.StringValue(i.DBInstanceClass), - aws.StringValue(i.StorageType), - - strconv.FormatBool(aws.BoolValue(i.MultiAZ)), - - aws.StringValue(i.SecondaryAvailabilityZone), - - utils.IdWithRegion(c.region, aws.StringValue(i.DBClusterIdentifier)), - - utils.IdWithRegion(c.region, aws.StringValue(i.ReadReplicaSourceDBInstanceIdentifier)), - ) - - ch <- utils.Gauge(dAllocatedStorage, float64(aws.Int64Value(i.AllocatedStorage))) - ch <- utils.Gauge(dStorageAutoscalingThreshold, float64(aws.Int64Value(i.MaxAllocatedStorage))) - ch <- utils.Gauge(dStorageProvisionedIOPs, float64(aws.Int64Value(i.Iops))) - ch <- utils.Gauge(dBackupRetentionPeriod, float64(aws.Int64Value(i.BackupRetentionPeriod))) - - for _, r := range i.ReadReplicaDBInstanceIdentifiers { - ch <- utils.Gauge(dReadReplicaInfo, float64(1), utils.IdWithRegion(c.region, aws.StringValue(r))) - } - - wg := sync.WaitGroup{} - - if aws.Int64Value(c.instance.MonitoringInterval) > 0 && c.instance.DbiResourceId != nil { - wg.Add(1) - go func() { - t := time.Now() - c.collectOsMetrics(ch) - c.logger.Info("os metrics collected in:", time.Since(t)) - wg.Done() - }() - } - - if c.dbCollector != nil { - wg.Add(1) - go func() { - t := time.Now() - c.dbCollector.Collect(ch) - c.logger.Info("db metrics collected in:", time.Since(t)) - wg.Done() - }() - } - - wg.Wait() + c.metricsChan = make(chan prometheus.Metric, 1000) + c.done = make(chan struct{}) + + go func() { + for metric := range c.metricsChan { + ch <- metric + } + close(c.done) + }() + + var wg sync.WaitGroup + wg.Add(3) + + go func() { + defer wg.Done() + c.collectInstanceInfo() + }() + + if aws.Int64Value(c.instance.MonitoringInterval) > 0 && c.instance.DbiResourceId != nil { + go func() { + defer wg.Done() + t := time.Now() + c.collectOsMetrics(c.metricsChan) + c.logger.Info("os metrics collected in:", time.Since(t)) + }() + } else { + wg.Done() + } + + if c.dbCollector != nil { + go func() { + defer wg.Done() + t := time.Now() + c.dbCollector.Collect(c.metricsChan) + c.logger.Info("db metrics collected in:", time.Since(t)) + }() + } else { + wg.Done() + } + + wg.Wait() + + if c.logParser != nil { + for _, lc := range c.logParser.GetCounters() { + c.metricsChan <- utils.Counter(dLogMessages, float64(lc.Messages), lc.Level.String(), lc.Hash, lc.Sample) + } + } + + close(c.metricsChan) + <-c.done +} - if c.logParser != nil { - for _, lc := range c.logParser.GetCounters() { - ch <- utils.Counter(dLogMessages, float64(lc.Messages), lc.Level.String(), lc.Hash, lc.Sample) - } - } +func (c *Collector) collectInstanceInfo() { + i := c.instance + + c.metricsChan <- utils.Gauge(dStatus, 1, aws.StringValue(i.DBInstanceStatus)) + + c.metricsChan <- utils.Gauge(dInfo, 1, + c.region, + aws.StringValue(i.AvailabilityZone), + aws.StringValue(i.Endpoint.Address), + c.ip.String(), + strconv.Itoa(int(aws.Int64Value(i.Endpoint.Port))), + aws.StringValue(i.Engine), + aws.StringValue(i.EngineVersion), + aws.StringValue(i.DBInstanceClass), + aws.StringValue(i.StorageType), + strconv.FormatBool(aws.BoolValue(i.MultiAZ)), + aws.StringValue(i.SecondaryAvailabilityZone), + utils.IdWithRegion(c.region, aws.StringValue(i.DBClusterIdentifier)), + utils.IdWithRegion(c.region, aws.StringValue(i.ReadReplicaSourceDBInstanceIdentifier)), + ) + + c.metricsChan <- utils.Gauge(dAllocatedStorage, float64(aws.Int64Value(i.AllocatedStorage))) + c.metricsChan <- utils.Gauge(dStorageAutoscalingThreshold, float64(aws.Int64Value(i.MaxAllocatedStorage))) + c.metricsChan <- utils.Gauge(dStorageProvisionedIOPs, float64(aws.Int64Value(i.Iops))) + c.metricsChan <- utils.Gauge(dBackupRetentionPeriod, float64(aws.Int64Value(i.BackupRetentionPeriod))) + + for _, r := range i.ReadReplicaDBInstanceIdentifiers { + c.metricsChan <- utils.Gauge(dReadReplicaInfo, float64(1), utils.IdWithRegion(c.region, aws.StringValue(r))) + } } func (c *Collector) Describe(ch chan<- *prometheus.Desc) { diff --git a/rds/discovery.go b/rds/discovery.go index 205e1e1..81ef7a2 100644 --- a/rds/discovery.go +++ b/rds/discovery.go @@ -10,101 +10,123 @@ import ( "github.com/coroot/logger" "github.com/prometheus/client_golang/prometheus" "time" + "sync" ) type Discoverer struct { - reg prometheus.Registerer - - awsSession *session.Session - - instances map[string]*Collector - - logger logger.Logger + reg prometheus.Registerer + awsSession *session.Session + instances sync.Map + logger logger.Logger + workerPool *utils.WorkerPool } func NewDiscoverer(reg prometheus.Registerer, awsSession *session.Session) *Discoverer { - d := &Discoverer{ - reg: reg, - awsSession: awsSession, - instances: map[string]*Collector{}, - logger: logger.NewKlog(""), - } - return d + return &Discoverer{ + reg: reg, + awsSession: awsSession, + instances: sync.Map{}, + logger: logger.NewKlog(""), + workerPool: utils.NewWorkerPool(10), // Adjust the number of workers as needed + } } func (d *Discoverer) Run() { - api := rds.New(d.awsSession) - - if err := d.refresh(api); err != nil { - d.logger.Warning(err) - } - - ticker := time.Tick(*flags.DiscoveryInterval) - for range ticker { - if err := d.refresh(api); err != nil { - d.logger.Warning(err) - } - } + api := rds.New(d.awsSession) + + if err := d.refresh(api); err != nil { + d.logger.Warning(err) + } + + ticker := time.NewTicker(*flags.DiscoveryInterval) + defer ticker.Stop() + + for range ticker.C { + if err := d.refresh(api); err != nil { + d.logger.Warning(err) + } + } } func (d *Discoverer) refresh(api rdsiface.RDSAPI) error { - t := time.Now() - defer func() { - d.logger.Info("instances refreshed in:", time.Since(t)) - }() - - output, err := api.DescribeDBInstances(nil) - if err != nil { - return err - } - - actualInstances := map[string]bool{} - for _, dbInstance := range output.DBInstances { - if dbInstance.Endpoint == nil { - continue - } - id := aws.StringValue(dbInstance.DBInstanceIdentifier) - input := &rds.ListTagsForResourceInput{ResourceName: dbInstance.DBInstanceArn} - tags := map[string]string{} - o, err := api.ListTagsForResource(input) - if err != nil { - d.logger.Error(err) - } else { - for _, t := range o.TagList { - tags[aws.StringValue(t.Key)] = aws.StringValue(t.Value) - } - } - if utils.Filtered(*flags.RdsFilters, tags) { - d.logger.Infof("RDS instance %s (tags: %s) was skipped according to the tag-based filters: %s", id, tags, *flags.RdsFilters) - continue - } - actualInstances[id] = true - i, ok := d.instances[id] - if !ok { - d.logger.Info("new DB instance found:", id) - i, err = NewCollector(d.awsSession, dbInstance) - if err != nil { - d.logger.Warning("failed to init RDS collector:", err) - continue - } - if err := d.wrappedReg(id).Register(i); err != nil { - d.logger.Warning(err) - continue - } - d.instances[id] = i - } - i.update(dbInstance) - } - - for id, i := range d.instances { - if !actualInstances[id] { - d.logger.Info("instance no longer exists:", id) - d.wrappedReg(id).Unregister(i) - i.Close() - delete(d.instances, id) - } - } - return nil + t := time.Now() + defer func() { + d.logger.Info("instances refreshed in:", time.Since(t)) + }() + + output, err := api.DescribeDBInstances(nil) + if err != nil { + return err + } + + actualInstances := make(map[string]bool) + var wg sync.WaitGroup + for _, dbInstance := range output.DBInstances { + wg.Add(1) + d.workerPool.Submit(func() { + defer wg.Done() + id := aws.StringValue(dbInstance.DBInstanceIdentifier) + actualInstances[id] = true + d.processInstance(api, dbInstance) + }) + } + wg.Wait() + + d.instances.Range(func(key, value interface{}) bool { + id := key.(string) + if _, exists := actualInstances[id]; !exists { + d.logger.Info("instance no longer exists:", id) + d.wrappedReg(id).Unregister(value.(*Collector)) + value.(*Collector).Close() + d.instances.Delete(id) + } + return true + }) + + return nil +} + +func (d *Discoverer) processInstance(api rdsiface.RDSAPI, dbInstance *rds.DBInstance) { + if dbInstance.Endpoint == nil { + return + } + id := aws.StringValue(dbInstance.DBInstanceIdentifier) + tags := d.getTags(api, dbInstance.DBInstanceArn) + if utils.Filtered(*flags.RdsFilters, tags) { + d.logger.Infof("RDS instance %s (tags: %s) was skipped according to the tag-based filters: %s", id, tags, *flags.RdsFilters) + return + } + + value, ok := d.instances.Load(id) + if !ok { + d.logger.Info("new DB instance found:", id) + collector, err := NewCollector(d.awsSession, dbInstance) + if err != nil { + d.logger.Warning("failed to init RDS collector:", err) + return + } + if err := d.wrappedReg(id).Register(collector); err != nil { + d.logger.Warning(err) + return + } + d.instances.Store(id, collector) + } else { + value.(*Collector).update(dbInstance) + } +} + +func (d *Discoverer) getTags(api rdsiface.RDSAPI, arn *string) map[string]string { + input := &rds.ListTagsForResourceInput{ResourceName: arn} + tags := map[string]string{} + o, err := api.ListTagsForResource(input) + if err != nil { + d.logger.Error(err) + } else { + for _, t := range o.TagList { + tags[aws.StringValue(t.Key)] = aws.StringValue(t.Value) + } + } + return tags } func (d *Discoverer) wrappedReg(instanceId string) prometheus.Registerer { diff --git a/utils/worker_pool.go b/utils/worker_pool.go new file mode 100644 index 0000000..02e9595 --- /dev/null +++ b/utils/worker_pool.go @@ -0,0 +1,41 @@ +package utils + +import ( + "sync" +) + +type WorkerPool struct { + workerCount int + jobChannel chan func() + wg sync.WaitGroup +} + +func NewWorkerPool(workerCount int) *WorkerPool { + pool := &WorkerPool{ + workerCount: workerCount, + jobChannel: make(chan func(), workerCount), + } + pool.start() + return pool +} + +func (p *WorkerPool) start() { + for i := 0; i < p.workerCount; i++ { + p.wg.Add(1) + go func() { + defer p.wg.Done() + for job := range p.jobChannel { + job() + } + }() + } +} + +func (p *WorkerPool) Submit(job func()) { + p.jobChannel <- job +} + +func (p *WorkerPool) Wait() { + close(p.jobChannel) + p.wg.Wait() +} \ No newline at end of file