Skip to content

Commit

Permalink
Fixes intelsdi-x#1228 - metric request ambiguous
Browse files Browse the repository at this point in the history
* Updates validateMetric on subscriptionGroups to use GetMetrics instead
of GetMetric since some namespace requests can expand to include
multiple metrics.
  • Loading branch information
jcooklin committed Sep 23, 2016
1 parent b004142 commit b1934be
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 150 deletions.
29 changes: 17 additions & 12 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,10 +1046,10 @@ func TestCollectDynamicMetrics(t *testing.T) {
<-lpe.done
metrics, err := c.metricCatalog.Fetch(core.NewNamespace())
So(err, ShouldBeNil)
So(len(metrics), ShouldEqual, 6)
So(len(metrics), ShouldEqual, 8)
mts, err := c.metricCatalog.GetMetrics(core.NewNamespace("intel", "mock", "*", "baz"), 2)
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 1)
So(len(mts), ShouldEqual, 2)
m := mts[0]
errs := c.subscriptionGroups.validateMetric(m)
So(errs, ShouldBeNil)
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
So(err, ShouldBeNil)
So(hits, ShouldEqual, 0)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
So(len(mts), ShouldEqual, 11)
mts, errs = c.CollectMetrics(taskID, nil)
hits, err = pool.CacheHits(m.namespace.String(), 2, taskID)
So(err, ShouldBeNil)
Expand All @@ -1101,7 +1101,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
// So(hits, ShouldEqual, 1)

So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
So(len(mts), ShouldEqual, 11)
pool.Unsubscribe(taskID)
pool.SelectAndKill(taskID, "unsubscription event")
So(pool.Count(), ShouldEqual, 0)
Expand Down Expand Up @@ -1211,7 +1211,7 @@ func TestCollectMetrics(t *testing.T) {
<-lpe.done
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 4)
So(len(mts), ShouldEqual, 5)

cd := cdata.NewNode()
cd.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})
Expand Down Expand Up @@ -1305,7 +1305,7 @@ func TestCollectNonSpecifiedDynamicMetrics(t *testing.T) {
<-lpe.done
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 4)
So(len(mts), ShouldEqual, 5)

cd := cdata.NewNode()

Expand Down Expand Up @@ -1342,16 +1342,19 @@ func TestCollectNonSpecifiedDynamicMetrics(t *testing.T) {
So(len(mts), ShouldBeGreaterThan, len(requested))
// expected 10 metrics "/intel/mock/[host_id]/baz
// for hosts in range (0 - 9)
So(len(mts), ShouldEqual, 10)
So(len(mts), ShouldEqual, 11)
for _, m := range mts {
// ensure the collected metric's namespace starts with /intel/mock/host...
So(m.Namespace().String(), ShouldStartWith, core.NewNamespace("intel", "mock", "host").String())
So(m.Namespace().String(), ShouldStartWith, core.NewNamespace("intel", "mock").String())
So(m.Namespace().String(), ShouldContainSubstring, "baz")

// ensure the collected data coming back is from v1
So(m.Version(), ShouldEqual, 1)
// ensure the collected data is dynamic
isDynamic, _ := m.Namespace().IsDynamic()
So(isDynamic, ShouldBeTrue)
if !strings.Contains(m.Namespace().String(), "all") {
isDynamic, _ := m.Namespace().IsDynamic()
So(isDynamic, ShouldBeTrue)
}
}
}

Expand Down Expand Up @@ -1387,7 +1390,7 @@ func TestCollectSpecifiedDynamicMetrics(t *testing.T) {
So(err, ShouldBeNil)
// metric catalog should contain the 3 following metrics:
// /intel/mock/foo; /intel/mock/bar; /intel/mock/*/baz
So(len(mts), ShouldEqual, 3)
So(len(mts), ShouldEqual, 4)

Convey("collection for specified host id - positive", func() {
taskID := "task-01"
Expand Down Expand Up @@ -1987,7 +1990,9 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
Convey("metrics are collected from mock1 and mock2", func() {
// ensure the data coming back is from mock 1 and mock 2
for _, m := range mts2 {
if strings.Contains(m.Namespace().String(), "host") || strings.Contains(m.Namespace().String(), "bar") {
if strings.Contains(m.Namespace().String(), "host") ||
strings.Contains(m.Namespace().String(), "bar") ||
strings.Contains(m.Namespace().String(), "all") {
val, ok := m.Data().(int)
So(ok, ShouldEqual, true)
So(val, ShouldBeGreaterThan, 1000)
Expand Down
85 changes: 44 additions & 41 deletions control/subscription_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,62 +270,65 @@ func (p *subscriptionGroups) validatePluginSubscription(pl core.SubscribedPlugin

func (s *subscriptionGroups) validateMetric(
metric core.Metric) (serrs []serror.SnapError) {
m, err := s.metricCatalog.GetMetric(metric.Namespace(), metric.Version())
mts, err := s.metricCatalog.GetMetrics(metric.Namespace(), metric.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": metric.Namespace().String(),
"version": metric.Version(),
}))
return serrs
}
for _, m := range mts {

// No metric found return error.
if m == nil {
serrs = append(
serrs, serror.New(
fmt.Errorf("no metric found cannot subscribe: (%s) version(%d)",
metric.Namespace(), metric.Version())))
return serrs
}
// No metric found return error.
if m == nil {
serrs = append(
serrs, serror.New(
fmt.Errorf("no metric found cannot subscribe: (%s) version(%d)",
metric.Namespace(), metric.Version())))
continue
}

m.config = metric.Config()
m.config = metric.Config()

typ, serr := core.ToPluginType(m.Plugin.TypeName())
if serr != nil {
return []serror.SnapError{serror.New(err)}
}
typ, serr := core.ToPluginType(m.Plugin.TypeName())
if serr != nil {
serrs = append(serrs, serror.New(err))
continue
}

// merge global plugin config
if m.config != nil {
m.config.ReverseMergeInPlace(
s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version()))
} else {
m.config = s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version())
}
// merge global plugin config
if m.config != nil {
m.config.ReverseMergeInPlace(
s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version()))
} else {
m.config = s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version())
}

// When a metric is added to the MetricCatalog, the policy of rules defined by the plugin is added to the metric's policy.
// If no rules are defined for a metric, we set the metric's policy to an empty ConfigPolicyNode.
// Checking m.policy for nil will not work, we need to check if rules are nil.
if m.policy.HasRules() {
if m.Config() == nil {
fields := log.Fields{
"metric": m.Namespace(),
"version": m.Version(),
"plugin": m.Plugin.Name(),
// When a metric is added to the MetricCatalog, the policy of rules defined by the plugin is added to the metric's policy.
// If no rules are defined for a metric, we set the metric's policy to an empty ConfigPolicyNode.
// Checking m.policy for nil will not work, we need to check if rules are nil.
if m.policy.HasRules() {
if m.Config() == nil {
fields := log.Fields{
"metric": m.Namespace(),
"version": m.Version(),
"plugin": m.Plugin.Name(),
}
serrs = append(serrs, serror.New(ErrConfigRequiredForMetric, fields))
continue
}
serrs = append(serrs, serror.New(ErrConfigRequiredForMetric, fields))
return serrs
}
ncdTable, errs := m.policy.Process(m.Config().Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
serrs = append(serrs, serror.New(e))
ncdTable, errs := m.policy.Process(m.Config().Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
serrs = append(serrs, serror.New(e))
}
continue
}
return serrs
m.config = cdata.FromTable(*ncdTable)
}
m.config = cdata.FromTable(*ncdTable)
}

return serrs
Expand Down
Loading

0 comments on commit b1934be

Please sign in to comment.