Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,21 @@ COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

--------------------------------------------------------------------
Dependency: github.com/coreos/go-semver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this package still somewhere used?

Copy link
Contributor Author

@ycombinator ycombinator Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be any more. This section probably got re-introduced during a rebase or something. I will remove it. Thanks for catching!

Revision: e214231b295a8ea9479f11b70b35d5acf3556d9b
License type (autodetected): Apache-2.0
./metricbeat/vendor/github.com/coreos/go-semver/LICENSE:
--------------------------------------------------------------------
Apache License 2.0

-------NOTICE-----
CoreOS Project
Copyright 2018 CoreOS, Inc

This product includes software developed at CoreOS, Inc.
(http://www.coreos.com/).

--------------------------------------------------------------------
Dependency: github.com/davecgh/go-spew
Version: v1.1.0
Expand Down
26 changes: 20 additions & 6 deletions metricbeat/module/kibana/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ import (
"github.com/elastic/beats/metricbeat/mb"
)

// StatsAPIAvailableVersion is the version of Kibana since when the stats API is available
const StatsAPIAvailableVersion = "6.4.0"
const (
// StatsAPIAvailableVersion is the version of Kibana since when the stats API is available
StatsAPIAvailableVersion = "6.4.0"

// SettingsAPIAvailableVersion is the version of Kibana since when the settings API is available
SettingsAPIAvailableVersion = "6.5.0"
)

// ReportErrorForMissingField reports and returns an error message for the given
// field being missing in API response received from Kibana
Expand Down Expand Up @@ -67,21 +72,30 @@ func GetVersion(http *helper.HTTP, currentPath string) (string, error) {
return versionStr, nil
}

// IsStatsAPIAvailable returns whether the stats API is available in the given version of Kibana
func IsStatsAPIAvailable(kibanaVersion string) (bool, error) {
currentVersion, err := common.NewVersion(kibanaVersion)
func isKibanaAPIAvailable(currentKibanaVersion, apiAvailableInKibanaVersion string) (bool, error) {
currentVersion, err := common.NewVersion(currentKibanaVersion)
if err != nil {
return false, err
}

wantVersion, err := common.NewVersion(StatsAPIAvailableVersion)
wantVersion, err := common.NewVersion(apiAvailableInKibanaVersion)
if err != nil {
return false, err
}

return !currentVersion.LessThan(wantVersion), nil
}

// IsStatsAPIAvailable returns whether the stats API is available in the given version of Kibana
func IsStatsAPIAvailable(currentKibanaVersion string) (bool, error) {
return isKibanaAPIAvailable(currentKibanaVersion, StatsAPIAvailableVersion)
}

// IsSettingsAPIAvailable returns whether the settings API is available in the given version of Kibana
func IsSettingsAPIAvailable(currentKibanaVersion string) (bool, error) {
return isKibanaAPIAvailable(currentKibanaVersion, SettingsAPIAvailableVersion)
}

func fetchPath(http *helper.HTTP, currentPath, newPath string) ([]byte, error) {
currentURI := http.GetURI()
defer http.SetURI(currentURI) // Reset after this request
Expand Down
18 changes: 18 additions & 0 deletions metricbeat/module/kibana/stats/_meta/test/settings.700.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test file for 6.3 or 6.4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The settings API in Kibana is not expected to exist until 6.5. Once it has been implemented for 6.5, I can add a settings.650.json file here. Should I do that?

Is your concern about how Metricbeat >= 6.5.0 will react when running with Kibana < 6.5.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Seems like we should get the version number of KB on the first request and make decisions based on it.

Copy link
Contributor Author

@ycombinator ycombinator Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I have a question about this. Prior to 6.4.0 there is no GET /api/stats in Kibana either. Does that mean we should be checking the version of Kibana before making that call too? [EDIT] This would require yet another Kibana API call, to GET /api/status first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we will need such a check or at least provide a good error message.

I think we could check the version once during New and just assume it stays the same over the lifetime of the metricset for now. So we would only do it once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to add this version check in New in a separate PR since it affects code that has already been merged into master (#7525). I will rebase this PR on top of my new PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version check PR: #7697

Copy link
Contributor Author

@ycombinator ycombinator Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented version check for Kibana settings API in 85b2438.

"cluster_uuid":"u5ii0pnQRka_P0gimfmthg",
"settings":{
"xpack":{
"default_admin_email":"[email protected]"
},
"kibana":{
"uuid":"5b2de169-2785-441b-ae8c-186a1936b17d",
"name":"Janes-MBP-2",
"index":".kibana",
"host":"localhost",
"transport_address":"localhost:5601",
"version":"7.0.0-alpha1",
"snapshot":false,
"status":"green"
}
}
}
76 changes: 53 additions & 23 deletions metricbeat/module/kibana/stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
schemaXPackMonitoring = s.Schema{
schemaXPackMonitoringStats = s.Schema{
"concurrent_connections": c.Int("concurrent_connections"),
"os": c.Dict("os", s.Schema{
"load": c.Dict("load", s.Schema{
Expand Down Expand Up @@ -105,61 +105,91 @@ var (
}
)

func eventMappingXPack(r mb.ReporterV2, intervalMs int64, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
r.Error(err)
return err
type dataParser func(mb.ReporterV2, common.MapStr, time.Time) (string, string, common.MapStr, error)

func statsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) {
clusterUUID, ok := data["clusterUuid"].(string)
if !ok {
return "", "", nil, elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r)
}

kibanaStatsFields, err := schemaXPackMonitoring.Apply(data)
kibanaStatsFields, err := schemaXPackMonitoringStats.Apply(data)
if err != nil {
r.Error(err)
return err
return "", "", nil, err
}

process, ok := data["process"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("process", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process", elastic.Kibana, r)
}
memory, ok := process["memory"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r)
}

rss, ok := memory["resident_set_size_bytes"].(float64)
if !ok {
return elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r)
}
kibanaStatsFields.Put("process.memory.resident_set_size_in_bytes", int64(rss))

timestamp := time.Now()
kibanaStatsFields.Put("timestamp", timestamp)
kibanaStatsFields.Put("timestamp", now)

// Make usage field passthrough as-is
usage, ok := data["usage"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("usage", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("usage", elastic.Kibana, r)
}
kibanaStatsFields.Put("usage", usage)

clusterUUID, ok := data["clusterUuid"].(string)
return "kibana_stats", clusterUUID, kibanaStatsFields, nil
}

func settingsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) {
clusterUUID, ok := data["cluster_uuid"].(string)
if !ok {
return elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("cluster_uuid", elastic.Kibana, r)
}

kibanaSettingsFields, ok := data["settings"]
if !ok {
return "", "", nil, elastic.ReportErrorForMissingField("settings", elastic.Kibana, r)
}

return "kibana_settings", clusterUUID, kibanaSettingsFields.(map[string]interface{}), nil
}

func eventMappingXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte, dataParserFunc dataParser) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
r.Error(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we report an error in x-pack or just ignore it? I would opt for ignoring for now as it ends up in an other index and we will not make use of it.

return err
}

t, clusterUUID, fields, err := dataParserFunc(r, data, now)
if err != nil {
return err
}

var event mb.Event
event.RootFields = common.MapStr{
"cluster_uuid": clusterUUID,
"timestamp": timestamp,
"timestamp": now,
"interval_ms": intervalMs,
"type": "kibana_stats",
"kibana_stats": kibanaStatsFields,
"type": t,
t: fields,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Kibana)
r.Event(event)

r.Event(event)
return nil
}

func eventMappingStatsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error {
return eventMappingXPack(r, intervalMs, now, content, statsDataParser)
}

func eventMappingSettingsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error {
return eventMappingXPack(r, intervalMs, now, content, settingsDataParser)
}
26 changes: 24 additions & 2 deletions metricbeat/module/kibana/stats/data_xpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
"io/ioutil"
"path/filepath"
"testing"
"time"

mbtest "github.com/elastic/beats/metricbeat/mb/testing"

"github.com/stretchr/testify/assert"
)

func TestEventMappingXPack(t *testing.T) {
func TestEventMappingStatsXPack(t *testing.T) {

files, err := filepath.Glob("./_meta/test/stats-legacy.*.json")
assert.NoError(t, err)
Expand All @@ -39,7 +40,28 @@ func TestEventMappingXPack(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventMappingXPack(reporter, 10000, input)
now := time.Now()

err = eventMappingStatsXPack(reporter, 10000, now, input)
assert.NoError(t, err, f)
assert.True(t, len(reporter.GetEvents()) >= 1, f)
assert.Equal(t, 0, len(reporter.GetErrors()), f)
}
}

func TestEventMappingSettingsXPack(t *testing.T) {

files, err := filepath.Glob("./_meta/test/settings.*.json")
assert.NoError(t, err)

for _, f := range files {
input, err := ioutil.ReadFile(f)
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
now := time.Now()

err = eventMappingSettingsXPack(reporter, 10000, now, input)
assert.NoError(t, err, f)
assert.True(t, len(reporter.GetEvents()) >= 1, f)
assert.Equal(t, 0, len(reporter.GetErrors()), f)
Expand Down
70 changes: 59 additions & 11 deletions metricbeat/module/kibana/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package stats

import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/helper"
Expand All @@ -36,7 +38,8 @@ func init() {
}

const (
statsPath = "api/stats"
statsPath = "api/stats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making these 2 a const?

settingsPath = "api/settings"
)

var (
Expand All @@ -50,7 +53,8 @@ var (
// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
statsHTTP *helper.HTTP
settingsHTTP *helper.HTTP
xPackEnabled bool
}

Expand All @@ -63,22 +67,22 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, err
}

http, err := helper.NewHTTP(base)
statsHTTP, err := helper.NewHTTP(base)
if err != nil {
return nil, err
}

kibanaVersion, err := kibana.GetVersion(http, statsPath)
kibanaVersion, err := kibana.GetVersion(statsHTTP, statsPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I just realised that bugs me a bit with this implementation is that it requires Kibana to be up and running normally when Metricbeat is started. Assuming Kibana is flaky during the moment the check happens, the metricset will not be running but error out even though Kibana could be totally fine again 1 sec later.

Not for this PR but thought worth mentioning.

Copy link
Contributor Author

@ycombinator ycombinator Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I suppose we could move this check (and the code dependent on it) into Fetch() but that might be too expensive to run every fetch cycle?

Copy link
Contributor Author

@ycombinator ycombinator Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need another metricset lifecycle method in addition to New() and Fetch(), say Ping() (I know, not a great name), which runs periodically but less frequently than Fetch(). The frequency can be defined per-metricset in that metricset's New() function.

By default this Ping() method would do nothing. A metricset could, however, override it. The code in Ping() could then set fields in the metricset's struct so the code in Fetch() can use the data in those fields.

In the case of this metricset, we would move the Kibana version checking (and dependent) code into Ping().

Thoughts? I agree that this would be beyond the scope of this PR but, if you like it, I can make a separate issue/followup PR for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using fetch but do it only once? I remember we do something like that in other cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged as it's something we should discuss as a follow up.

if err != nil {
return nil, err
}

isAPIAvailable, err := kibana.IsStatsAPIAvailable(kibanaVersion)
isStatsAPIAvailable, err := kibana.IsStatsAPIAvailable(kibanaVersion)
if err != nil {
return nil, err
}

if !isAPIAvailable {
if !isStatsAPIAvailable {
const errorMsg = "The kibana stats metricset is only supported with Kibana >= %v. You are currently running Kibana %v"
return nil, fmt.Errorf(errorMsg, kibana.StatsAPIAvailableVersion, kibanaVersion)
}
Expand All @@ -87,12 +91,38 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.")

// Use legacy API response so we can passthru usage as-is
http.SetURI(http.GetURI() + "&legacy=true")
statsHTTP.SetURI(statsHTTP.GetURI() + "&legacy=true")
}

var settingsHTTP *helper.HTTP
if config.XPackEnabled {
cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.")

isSettingsAPIAvailable, err := kibana.IsSettingsAPIAvailable(kibanaVersion)
if err != nil {
return nil, err
}

if !isSettingsAPIAvailable {
const errorMsg = "The kibana stats metricset with X-Pack enabled is only supported with Kibana >= %v. You are currently running Kibana %v"
return nil, fmt.Errorf(errorMsg, kibana.SettingsAPIAvailableVersion, kibanaVersion)
}

settingsHTTP, err = helper.NewHTTP(base)
if err != nil {
return nil, err
}

// HACK! We need to do this because there might be a basepath involved, so we
// only search/replace the actual API paths
settingsURI := strings.Replace(statsHTTP.GetURI(), statsPath, settingsPath, 1)
settingsHTTP.SetURI(settingsURI)
}

return &MetricSet{
base,
http,
statsHTTP,
settingsHTTP,
config.XPackEnabled,
}, nil
}
Expand All @@ -101,17 +131,35 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
content, err := m.http.FetchContent()
intervalMs := m.Module().Config().Period.Nanoseconds() / 1000 / 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have this code inside the x-pack part as it's only used there I think? Perhaps we can find a way to only if that xPackEnabled check once and run instead also in fetchStats. Why not move this to line 151 with the fetchSettings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully understood your comment but I took a stab at it in 3afa1a2. Take a look and let me know what you think. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

now := time.Now()

m.fetchStats(r, intervalMs, now)
if m.xPackEnabled {
m.fetchSettings(r, intervalMs, now)
}
}

func (m *MetricSet) fetchStats(r mb.ReporterV2, intervalMs int64, now time.Time) {
content, err := m.statsHTTP.FetchContent()
if err != nil {
r.Error(err)
return
}

if m.xPackEnabled {
intervalMs := m.Module().Config().Period.Nanoseconds() / 1000 / 1000
eventMappingXPack(r, intervalMs, content)
eventMappingStatsXPack(r, intervalMs, now, content)
} else {
eventMapping(r, content)
}
}

func (m *MetricSet) fetchSettings(r mb.ReporterV2, intervalMs int64, now time.Time) {
content, err := m.settingsHTTP.FetchContent()
if err != nil {
r.Error(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we report an error? I think we will not make use of it.

return
}

eventMappingSettingsXPack(r, intervalMs, now, content)
}
Loading