Skip to content

Commit

Permalink
Add batching support to metrics-exporter.
Browse files Browse the repository at this point in the history
  • Loading branch information
tbarker25 committed Jan 24, 2022
1 parent 7e5a692 commit 2b05a79
Showing 1 changed file with 164 additions and 41 deletions.
205 changes: 164 additions & 41 deletions exporter/collector/metricsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"path"
"strings"
"sync"
"time"
"unicode"

Expand All @@ -39,6 +40,7 @@ import (
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand All @@ -55,10 +57,22 @@ type MetricsExporter struct {
client *monitoring.MetricClient
obs selfObservability
mapper metricMapper
// A channel that receives metric descriptor and sends them to GCM once.
mds chan *metricpb.MetricDescriptor
// Channel that is closed when exportMetricDescriptorRunnner goroutine is finished
mdsDone chan struct{}

// tracks the currently running child tasks
goroutines sync.WaitGroup
// channel for signaling a graceful shutdown
shutdownC chan struct{}

// A channel that receives metric descriptor and sends them to GCM once
metricDescriptorC chan *metricpb.MetricDescriptor
// Tracks the metric descriptors that have already been sent to GCM
mdCache map[string]*metricpb.MetricDescriptor

// A channel that receives timeserieses and exports them to GCM in batches
timeSeriesC chan *monitoringpb.TimeSeries
// stores the currently pending batch of timeserieses
pendingTimeSerieses []*monitoringpb.TimeSeries
batchTimeoutTimer *time.Timer
}

// metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has
Expand All @@ -74,14 +88,26 @@ const (
SummaryPercentileSuffix = "_summary_percentile"
)

const (
batchTimeout = 10 * time.Second
sendBatchSize = 200
)

type labels map[string]string

func (me *MetricsExporter) Shutdown(ctx context.Context) error {
close(me.mds)
// TODO: pass ctx to goroutines so that we can use it's deadline
close(me.shutdownC)
c := make(chan struct{})
go func() {
// Wait until all goroutines are done.
me.goroutines.Wait()
close(c)
}()
select {
case <-me.mdsDone:
case <-ctx.Done():
me.obs.log.Error("Error waiting for async CreateMetricDescriptor calls to finish.", zap.Error(ctx.Err()))
me.obs.log.Error("Error waiting for async tasks to finish.", zap.Error(ctx.Err()))
case <-c:
}
return me.client.Close()
}
Expand Down Expand Up @@ -126,22 +152,28 @@ func NewGoogleCloudMetricsExporter(
obs: selfObservability{log: log},
mapper: metricMapper{cfg},
// We create a buffered channel for metric descriptors.
// MetricDescritpors are asycnhronously sent and optimistic.
// MetricDescritpors are asychronously sent and optimistic.
// We only get Unit/Description/Display name from them, so it's ok
// to drop / conserve resources for sending timeseries.
mds: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize),
mdsDone: make(chan struct{}),
metricDescriptorC: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize),
mdCache: make(map[string]*metricpb.MetricDescriptor),
timeSeriesC: make(chan *monitoringpb.TimeSeries),
shutdownC: make(chan struct{}, 1),
}

// Fire up the metric descriptor exporter.
mExp.goroutines.Add(1)
go mExp.exportMetricDescriptorRunner()

// Fire up the time series exporter.
mExp.goroutines.Add(1)
go mExp.exportTimeSeriesRunner()

return mExp, nil
}

// PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary
func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) error {
timeSeries := make([]*monitoringpb.TimeSeries, 0, m.DataPointCount())
rms := m.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -156,57 +188,91 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err
mes := ilm.Metrics()
for k := 0; k < mes.Len(); k++ {
metric := mes.At(k)
timeSeries = append(timeSeries, me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric)...)
for _, ts := range me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric) {
me.timeSeriesC <- ts
}

// We only send metric descriptors if we're configured *and* we're not sending service timeseries.
if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries {
continue
}

for _, md := range me.mapper.metricDescriptor(metric) {
if md != nil {
select {
case me.mds <- md:
default:
// Ignore drops, we'll catch descriptor next time around.
}
if md == nil {
continue
}
select {
case me.metricDescriptorC <- md:
default:
// Ignore drops, we'll catch descriptor next time around.
}
}
}
}
}

// TODO: self observability
return nil
}

func (me *MetricsExporter) exportPendingTimeSerieses(ctx context.Context) {
var sendSize int
if len(me.pendingTimeSerieses) < sendBatchSize {
sendSize = len(me.pendingTimeSerieses)
} else {
sendSize = sendBatchSize
}

var ts []*monitoringpb.TimeSeries
ts, me.pendingTimeSerieses = me.pendingTimeSerieses, me.pendingTimeSerieses[sendSize:]

var err error
if me.cfg.MetricConfig.CreateServiceTimeSeries {
err := me.createServiceTimeSeries(ctx, timeSeries)
recordPointCount(ctx, len(timeSeries), m.DataPointCount()-len(timeSeries), err)
return err
err = me.createServiceTimeSeries(ctx, ts)
} else {
err = me.createTimeSeries(ctx, ts)
}
err := me.createTimeSeries(ctx, timeSeries)
recordPointCount(ctx, len(timeSeries), m.DataPointCount()-len(timeSeries), err)
return err

var st string
if err == nil {
st = "OK"
} else if s, ok := status.FromError(err); ok {
st = statusCodeToString(s)
} else {
st = "UNKNOWN"
}

recordPointCountDataPoint(ctx, len(ts), st)
me.obs.log.Error("could not export time series to GCM", zap.Error(err))
}

// Reads metric descriptors from the md channel, and reports them (once) to GCM.
func (me *MetricsExporter) exportMetricDescriptorRunner() {
mdCache := make(map[string]*metricpb.MetricDescriptor)
defer me.goroutines.Done()

// We iterate over all metric descritpors until the channel is closed.
// Note: if we get terminated, this will still attempt to export all descriptors
// prior to shutdown.
for md := range me.mds {
// Not yet sent, now we sent it.
if mdCache[md.Type] == nil {
err := me.exportMetricDescriptor(context.TODO(), md)
// TODO: Log-once on error, per metric descriptor?
if err != nil {
me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", md))
continue
for {
select {
case <-me.shutdownC:
for {
// We are shutting down. Publish all the pending
// items on the channel before we stop.
select {
case md := <-me.metricDescriptorC:
me.exportMetricDescriptor(md)
default:
goto DONE
}
}
mdCache[md.Type] = md
DONE:
// Return and continue graceful shutdown.
return

case md := <-me.metricDescriptorC:
me.exportMetricDescriptor(md)
}
// TODO: We may want to compare current MD vs. previous and validate no changes.
}
close(me.mdsDone)
}

func (me *MetricsExporter) projectName() string {
Expand All @@ -215,14 +281,21 @@ func (me *MetricsExporter) projectName() string {
}

// Helper method to send metric descriptors to GCM.
func (me *MetricsExporter) exportMetricDescriptor(ctx context.Context, md *metricpb.MetricDescriptor) error {
// export
func (me *MetricsExporter) exportMetricDescriptor(md *metricpb.MetricDescriptor) {
if _, exists := me.mdCache[md.Type]; exists {
return
}
me.mdCache[md.Type] = md

req := &monitoringpb.CreateMetricDescriptorRequest{
Name: me.projectName(),
MetricDescriptor: md,
}
_, err := me.client.CreateMetricDescriptor(ctx, req)
return err
_, err := me.client.CreateMetricDescriptor(context.Background(), req)
if err != nil {
// TODO: Log-once on error, per metric descriptor?
me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", md))
}
}

// Sends a user-custom-metric timeseries.
Expand Down Expand Up @@ -925,3 +998,53 @@ func mapMetricPointKind(m pdata.Metric) (metricpb.MetricDescriptor_MetricKind, m
}
return kind, typ
}

func (me *MetricsExporter) exportTimeSeriesRunner() {
defer me.goroutines.Done()
me.batchTimeoutTimer = time.NewTimer(batchTimeout)
for {
select {
case <-me.shutdownC:
for {
// We are shutting down. Publish all the pending
// items on the channel before we stop.
select {
case item := <-me.timeSeriesC:
me.processItem(item)
default:
goto DONE
}
}
DONE:
if len(me.pendingTimeSerieses) > 0 {
me.exportPendingTimeSerieses(context.TODO())
}
// Return and continue graceful shutdown.
return
case item := <-me.timeSeriesC:
me.processItem(item)
case <-me.batchTimeoutTimer.C:
if len(me.pendingTimeSerieses) > 0 {
me.exportPendingTimeSerieses(context.TODO())
}
me.batchTimeoutTimer.Reset(batchTimeout)
}
}
}

func (me *MetricsExporter) processItem(ts *monitoringpb.TimeSeries) {
// TODO: limit the size of this slice and exert back-pressure.
me.pendingTimeSerieses = append(me.pendingTimeSerieses, ts)
sent := false
for len(me.pendingTimeSerieses) >= sendBatchSize {
sent = true
me.exportPendingTimeSerieses(context.TODO())
}

if sent {
if !me.batchTimeoutTimer.Stop() {
<-me.batchTimeoutTimer.C
}
me.batchTimeoutTimer.Reset(batchTimeout)
}
}

0 comments on commit 2b05a79

Please sign in to comment.