Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added IteratableGatherer for future zero alloc implementations. #927

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions prometheus/promhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func Handler() http.Handler {
// instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function.
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return HandlerForIteratable(prometheus.ToIteratableGatherer(reg), opts)
}

// HandlerForIteratable returns an uninstrumented http.Handler for the provided
// IteratableGatherer. The behavior of the Handler is defined by the provided
// HandlerOpts. Thus, HandlerFor is useful to create http.Handlers for custom
// Gatherers, with non-default HandlerOpts, and/or with custom (or no)
// instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function.
func HandlerForIteratable(reg prometheus.IteratableGatherer, opts HandlerOpts) http.Handler {
var (
inFlightSem chan struct{}
errCnt = prometheus.NewCounterVec(
Expand Down Expand Up @@ -123,7 +133,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return
}
}
mfs, err := reg.Gather()

iter, err := reg.GatherIterate()
if err != nil {
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error gathering metrics:", err)
Expand All @@ -133,7 +144,7 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
case PanicOnError:
panic(err)
case ContinueOnError:
if len(mfs) == 0 {
if iter == nil {
// Still report the error if no metrics have been gathered.
httpError(rsp, err)
return
Expand Down Expand Up @@ -169,14 +180,22 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {

// handleError handles the error according to opts.ErrorHandling
// and returns true if we have to abort after the handling.
handleError := func(err error) bool {
handleError := func(encoding bool, err error) bool {
if err == nil {
return false
}
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error encoding and sending metric family:", err)
if encoding {
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error encoding and sending metric family:", err)
}
errCnt.WithLabelValues("encoding").Inc()
} else {
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error gathering metrics:", err)
}
errCnt.WithLabelValues("gathering").Inc()
}
errCnt.WithLabelValues("encoding").Inc()

switch opts.ErrorHandling {
case PanicOnError:
panic(err)
Expand All @@ -191,14 +210,17 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return false
}

for _, mf := range mfs {
if handleError(enc.Encode(mf)) {
for iter.Next() {
if handleError(true, enc.Encode(iter.At())) {
for iter.Next() {} // Exhaust iterator.
return
}
}
handleError(false, iter.Err())

if closer, ok := enc.(expfmt.Closer); ok {
// This in particular takes care of the final "# EOF\n" line for OpenMetrics.
if handleError(closer.Close()) {
if handleError(true, closer.Close()) {
return
}
}
Expand Down
72 changes: 72 additions & 0 deletions prometheus/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,78 @@ type Gatherer interface {
Gather() ([]*dto.MetricFamily, error)
}

// IteratableGatherer is the interface for the part of a registry in charge of gathering
// the collected metrics into a number of MetricFamilies. The IteratableGatherer interface
// comes with the same general implication as described for the Registerer
// interface.
type IteratableGatherer interface {
// GatherIterate allows iterating over a lexicographically sorted, uniquely named
// dto.MetricFamily that are obtained through calls the Collect method
// of the registered Collectors. GatherIterate ensures that the
// iterator is valid until exhausting (until Next is false).
// As an exception to the strict consistency requirements described for
// metric.Desc, GatherIterate will tolerate different sets of label names for
// metrics of the same metric family.
//
// Even if an error occurs, GatherIterate allows to iterate over as many metrics as
// possible. Hence, if a non-nil error is returned, the returned
// MetricFamilyIterator could be nil (in case of a fatal error that
// prevented any meaningful metric collection) or iteratable over a number of
// MetricFamily protobufs, some of which might be incomplete, and some
// might be missing altogether. The returned error (which might be a
// MultiError) explains the details. Note that this is mostly useful for
// debugging purposes. If the gathered protobufs are to be used for
// exposition in actual monitoring, it is almost always better to not
// expose an incomplete result and instead disregard the returned
// MetricFamily protobufs in case the returned error is non-nil.
//
// GatherIterate allows implementations to use zero-alloc caching and
// other efficiency techniques.
//
// Iterated dto.MetricFamily is read only and valid only until you invoke another
// Next(). Note that consumer is *always* required to exhaust iterator to release all resources.
GatherIterate() (MetricFamilyIterator, error)
}

// MetricFamilyIterator ...
type MetricFamilyIterator interface {
Next() bool
At() *dto.MetricFamily
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helps to protect the addition and removal of whole MetricFamilies. But it doesn't protect editing existing MetricFamilies in place. For example, if your set of metrics is entirely static, you just want to change a value now and then by directly changing only the Value protobuf message deep down within a MetricFamily, you would still create data races with this approach.

In other words, after At() is called, the iterator has no control over the usage of the returned MetricFamily anymore. You only moved the problem one level down (from the MetricFamily slice to an individual MetricFamily).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, I plan to explicitly mention in the interface that after Next you lose control over memory space pointed by pointer. See GatherIterate method comment. Also see implementation.

I don't see any other type safe way to do it. Even my initial idea with GatherForeach(f func(*dto.MetricFamily) error) error has the same issue - someone might reuse it.

We have to rely on runtime tests and interface comments, unfortunately. But this will work. WDYT?

Copy link
Member Author

@bwplotka bwplotka Oct 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well - there is one type safe manner. We could tie gathering with encoding in some GatherEncode(encoder) error - but:

  • it is a little bit package spaghetti,
  • tied to promhttp use as we might have to have this non trivial error mechanism that either stops, continues or panics ...

But might be way to go

Err() error
}

func ToIteratableGatherer(g Gatherer) IteratableGatherer {
return &iteratableGathererAdapter{g:g}
}

type iteratableGathererAdapter struct {
g Gatherer
}

func(a *iteratableGathererAdapter) GatherIterate() (MetricFamilyIterator, error) {
mfs, err := a.g.Gather()
return &mfIterator{mfs: mfs, i: -1}, err
}

type mfIterator struct {
mfs []*dto.MetricFamily
i int
}

func (m *mfIterator) Next() bool {
if m.i+1 >= len(m.mfs) {
return false
}
m.i++
return true
}

func (m *mfIterator) At() *dto.MetricFamily {
return m.mfs[m.i]
}

func (m *mfIterator) Err() error { return nil }

// Register registers the provided Collector with the DefaultRegisterer.
//
// Register is a shortcut for DefaultRegisterer.Register(c). See there for more
Expand Down