Skip to content

Commit

Permalink
Replace deprecated _xpack endpoints (#9656)
Browse files Browse the repository at this point in the history
In 7.0.0, Elasticsearch is deprecating most `_xpack/*` endpoints. See elastic/elasticsearch#35958.

This PR updates the Beats codebase, except test fixtures, with the appropriate replacements for the deprecated endpoints.
  • Loading branch information
ycombinator authored Feb 5, 2019
1 parent b038211 commit fd98c11
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support to read ILM policy from external JSON file. {pull}10347[10347]
- Add `overwrite` and `check_exists` settings to ILM support. {pull}10347[10347]
- Generate Kibana index pattern on demand instead of using a local file. {pull}10478[10478]
- Calls to Elasticsearch X-Pack APIs made by Beats won't cause deprecation logs in Elasticsearch logs. {9656}9656[9656]

*Auditbeat*

Expand Down
9 changes: 0 additions & 9 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4381,19 +4381,10 @@ The name of the action that was executed
*`elasticsearch.audit.url.params`*::
+
--
type: keyword
example: {username=jacknich2}
REST URI parameters
*`elasticsearch.audit.url.params.text`*::
+
--
type: text
--
--
*`elasticsearch.audit.indices`*::
Expand Down
4 changes: 0 additions & 4 deletions filebeat/module/elasticsearch/audit/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
- name: url.params
description: "REST URI parameters"
example: "{username=jacknich2}"
type: keyword
multi_fields:
- name: text
type: text
- name: indices
description: "Indices accessed by action"
example: [ "foo-2019.01.04", "foo-2019.01.03", "foo-2019.01.06" ]
Expand Down
2 changes: 1 addition & 1 deletion filebeat/module/elasticsearch/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 19 additions & 7 deletions filebeat/tests/system/test_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from elasticsearch import Elasticsearch
import logging
from parameterized import parameterized
import semver


class Test(BaseTest):
Expand Down Expand Up @@ -48,23 +49,34 @@ def _run_ml_test(self, modules_flag):

from elasticsearch import AuthorizationException

es_info = self.es.info()
version = semver.parse(es_info["version"]["number"])
if version["major"] < 7:
start_trial_api_url = "/_xpack/license/start_trial?acknowledge=true"
ml_datafeeds_url = "/_xpack/ml/datafeeds/"
ml_anomaly_detectors_url = "/_xpack/ml/anomaly_detectors/"
else:
start_trial_api_url = "/_license/start_trial?acknowledge=true"
ml_datafeeds_url = "/_ml/datafeeds/"
ml_anomaly_detectors_url = "/_ml/anomaly_detectors/"

try:
output = self.es.transport.perform_request("POST", "/_xpack/license/start_trial?acknowledge=true")
output = self.es.transport.perform_request("POST", start_trial_api_url)
except AuthorizationException:
print("License already enabled")

print("Test modules_flag: {}".format(modules_flag))

# Clean any previous state
for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]:
for df in self.es.transport.perform_request("GET", ml_datafeeds_url)["datafeeds"]:
if df["datafeed_id"] == 'filebeat-nginx-access-response_code':
self.es.transport.perform_request(
"DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"])
"DELETE", "/_ml/datafeeds/" + df["datafeed_id"])

for df in self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")["jobs"]:
for df in self.es.transport.perform_request("GET", ml_anomaly_detectors_url)["jobs"]:
if df["job_id"] == 'datafeed-filebeat-nginx-access-response_code':
self.es.transport.perform_request(
"DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"])
"DELETE", ml_anomaly_detectors_url + df["job_id"])

shutil.rmtree(os.path.join(self.working_dir,
"modules.d"), ignore_errors=True)
Expand Down Expand Up @@ -109,10 +121,10 @@ def _run_ml_test(self, modules_flag):
# Check result
self.wait_until(lambda: "filebeat-nginx-access-response_code" in
(df["job_id"] for df in self.es.transport.perform_request(
"GET", "/_xpack/ml/anomaly_detectors/")["jobs"]),
"GET", ml_anomaly_detectors_url)["jobs"]),
max_timeout=60)
self.wait_until(lambda: "datafeed-filebeat-nginx-access-response_code" in
(df["datafeed_id"] for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]))
(df["datafeed_id"] for df in self.es.transport.perform_request("GET", ml_datafeeds_url)["datafeeds"]))

beat.kill()

Expand Down
6 changes: 3 additions & 3 deletions libbeat/docs/security/basic-auth.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ You can create roles from the **Management / Roles** UI in {kib} or through the
ifeval::["{beatname_lc}"!="filebeat"]
["source","sh",subs="attributes,callouts"]
---------------------------------------------------------------
POST _xpack/security/role/{beat_default_index_prefix}_writer
POST _security/role/{beat_default_index_prefix}_writer
{
"cluster": ["manage_index_templates","monitor"],
"indices": [
Expand All @@ -46,7 +46,7 @@ endif::[]
ifeval::["{beatname_lc}"=="filebeat"]
["source","sh",subs="attributes,callouts"]
---------------------------------------------------------------
POST _xpack/security/role/{beat_default_index_prefix}_writer
POST _security/role/{beat_default_index_prefix}_writer
{
"cluster": ["manage_index_templates","monitor","manage_ingest_pipelines"], <1>
"indices": [
Expand Down Expand Up @@ -81,7 +81,7 @@ named ++{beat_default_index_prefix}_internal++ that has the
--
["source","sh",subs="attributes,callouts"]
---------------------------------------------------------------
POST /_xpack/security/user/{beat_default_index_prefix}_internal
POST /_security/user/{beat_default_index_prefix}_internal
{
"password" : "{pwd}",
"roles" : [ "{beat_default_index_prefix}_writer","kibana_user"],
Expand Down
4 changes: 2 additions & 2 deletions libbeat/docs/security/user-access.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ You can create roles from the **Management > Roles** UI in {kib} or through the
--
["source","sh",subs="attributes,callouts"]
---------------------------------------------------------------
POST _xpack/security/role/{beat_default_index_prefix}_reader
POST _security/role/{beat_default_index_prefix}_reader
{
"indices": [
{
Expand All @@ -43,7 +43,7 @@ following request grants ++{beat_default_index_prefix}_user++ the
--
["source", "sh", subs="attributes,callouts"]
---------------------------------------------------------------
POST /_xpack/security/user/{beat_default_index_prefix}_user
POST /_security/user/{beat_default_index_prefix}_user
{
"password" : "{pwd}",
"roles" : [ "{beat_default_index_prefix}_reader","kibana_user"],
Expand Down
23 changes: 15 additions & 8 deletions libbeat/ml-importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
)

var (
esDataFeedURL = "/_xpack/ml/datafeeds/datafeed-%s"
esJobURL = "/_xpack/ml/anomaly_detectors/%s"
kibanaGetModuleURL = "/api/ml/modules/get_module/%s"
kibanaRecognizeURL = "/api/ml/modules/recognize/%s"
kibanaSetupModuleURL = "/api/ml/modules/setup/%s"
esMLDataFeedURLSuffix = "/datafeeds/datafeed-%s"
esMLJobURLSuffix = "/anomaly_detectors/%s"
kibanaGetModuleURL = "/api/ml/modules/get_module/%s"
kibanaRecognizeURL = "/api/ml/modules/recognize/%s"
kibanaSetupModuleURL = "/api/ml/modules/setup/%s"
)

// MLConfig contains the required configuration for loading one job and the associated
Expand Down Expand Up @@ -121,11 +121,11 @@ func readJSONFile(path string) (common.MapStr, error) {

// ImportMachineLearningJob uploads the job and datafeed configuration to ES/xpack.
func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error {
jobURL := fmt.Sprintf(esJobURL, cfg.ID)
datafeedURL := fmt.Sprintf(esDataFeedURL, cfg.ID)
esVersion := esClient.GetVersion()
jobURL := makeMLURLPerESVersion(esVersion, fmt.Sprintf(esMLJobURLSuffix, cfg.ID))
datafeedURL := makeMLURLPerESVersion(esVersion, fmt.Sprintf(esMLDataFeedURLSuffix, cfg.ID))

if len(cfg.MinVersion) > 0 {
esVersion := esClient.GetVersion()
if !esVersion.IsValid() {
return errors.New("Invalid Elasticsearch version")
}
Expand Down Expand Up @@ -275,3 +275,10 @@ func checkResponse(r []byte) error {

return errs.Err()
}

func makeMLURLPerESVersion(esVersion common.Version, mlURLPathSuffix string) string {
if esVersion.Major < 7 {
return "_xpack/ml/" + mlURLPathSuffix
}
return "_ml" + mlURLPathSuffix
}
12 changes: 9 additions & 3 deletions libbeat/ml-importer/importer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,15 @@ func TestImportJobs(t *testing.T) {
err = ImportMachineLearningJob(client, &mlconfig)
assert.NoError(t, err)

// check by GETing back
var mlBaseURL string
if client.GetVersion().Major < 7 {
mlBaseURL = "/_xpack/ml"
} else {
mlBaseURL = "/_ml"
}

status, response, err := client.Request("GET", "/_xpack/ml/anomaly_detectors", "", nil, nil)
// check by GETing back
status, response, err := client.Request("GET", mlBaseURL+"/anomaly_detectors", "", nil, nil)
assert.NoError(t, err)
assert.Equal(t, 200, status)

Expand All @@ -157,7 +163,7 @@ func TestImportJobs(t *testing.T) {
}
assert.True(t, found)

status, response, err = client.Request("GET", "/_xpack/ml/datafeeds", "", nil, nil)
status, response, err = client.Request("GET", mlBaseURL+"/datafeeds", "", nil, nil)
assert.NoError(t, err)
assert.Equal(t, 200, status)

Expand Down
13 changes: 12 additions & 1 deletion metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,20 @@ func GetLicense(http *helper.HTTP, resetURI string) (*License, error) {
// First, check the cache
license := licenseCache.get()

info, err := GetInfo(http, resetURI)
if err != nil {
return nil, err
}
var licensePath string
if info.Version.Number.Major < 7 {
licensePath = "_xpack/license"
} else {
licensePath = "_license"
}

// Not cached, fetch license from Elasticsearch
if license == nil {
content, err := fetchPath(http, resetURI, "_xpack/license", "")
content, err := fetchPath(http, resetURI, licensePath, "")
if err != nil {
return nil, err
}
Expand Down
44 changes: 30 additions & 14 deletions metricbeat/module/elasticsearch/elasticsearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,22 @@ func TestFetch(t *testing.T) {
err := createIndex(host)
assert.NoError(t, err)

err = enableTrialLicense(host)
version, err := getElasticsearchVersion(host)
if err != nil {
t.Fatal("getting elasticsearch version", err)
}

err = enableTrialLicense(host, version)
assert.NoError(t, err)

err = createMLJob(host)
err = createMLJob(host, version)
assert.NoError(t, err)

err = createCCRStats(host)
assert.NoError(t, err)

for _, metricSet := range metricSets {
checkSkip(t, metricSet, host)
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet))
events, errs := mbtest.ReportingFetchV2(f)
Expand All @@ -95,8 +100,14 @@ func TestData(t *testing.T) {
compose.EnsureUp(t, "elasticsearch")

host := net.JoinHostPort(getEnvHost(), getEnvPort())

version, err := getElasticsearchVersion(host)
if err != nil {
t.Fatal("getting elasticsearch version", err)
}

for _, metricSet := range metricSets {
checkSkip(t, metricSet, host)
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet))
err := mbtest.WriteEventsReporterV2(f, t, metricSet)
Expand Down Expand Up @@ -164,10 +175,15 @@ func createIndex(host string) error {
}

// createIndex creates and elasticsearch index in case it does not exit yet
func enableTrialLicense(host string) error {
func enableTrialLicense(host string, version *common.Version) error {
client := &http.Client{}

enableXPackURL := "/_xpack/license/start_trial?acknowledge=true"
var enableXPackURL string
if version.Major < 7 {
enableXPackURL = "/_xpack/license/start_trial?acknowledge=true"
} else {
enableXPackURL = "/_license/start_trial?acknowledge=true"
}

req, err := http.NewRequest("POST", "http://"+host+enableXPackURL, nil)
if err != nil {
Expand All @@ -191,14 +207,19 @@ func enableTrialLicense(host string) error {
return nil
}

func createMLJob(host string) error {
func createMLJob(host string, version *common.Version) error {

mlJob, err := ioutil.ReadFile("ml_job/_meta/test/test_job.json")
if err != nil {
return err
}

jobURL := "/_xpack/ml/anomaly_detectors/total-requests"
var jobURL string
if version.Major < 7 {
jobURL = "/_xpack/ml/anomaly_detectors/total-requests"
} else {
jobURL = "/_ml/anomaly_detectors/total-requests"
}

if checkExists("http://" + host + jobURL) {
return nil
Expand Down Expand Up @@ -279,16 +300,11 @@ func checkExists(url string) bool {
return false
}

func checkSkip(t *testing.T, metricset string, host string) {
func checkSkip(t *testing.T, metricset string, version *common.Version) {
if metricset != "ccr" {
return
}

version, err := getElasticsearchVersion(host)
if err != nil {
t.Fatal("getting elasticsearch version", err)
}

isCCRStatsAPIAvailable := elastic.IsFeatureAvailable(version, elasticsearch.CCRStatsAPIAvailableVersion)

if !isCCRStatsAPIAvailable {
Expand Down
11 changes: 9 additions & 2 deletions metricbeat/module/elasticsearch/ml_job/ml_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func init() {
}

const (
jobPath = "/_xpack/ml/anomaly_detectors/_all/_stats"
jobPathSuffix = "/anomaly_detectors/_all/_stats"
)

// MetricSet for ml job
Expand All @@ -45,10 +45,11 @@ type MetricSet struct {
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Get the stats from the local node
ms, err := elasticsearch.NewMetricSet(base, jobPath)
ms, err := elasticsearch.NewMetricSet(base, "") // servicePath will be set in Fetch() based on ES version
if err != nil {
return nil, err
}

return &MetricSet{MetricSet: ms}, nil
}

Expand All @@ -74,6 +75,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

if info.Version.Number.Major < 7 {
m.SetServiceURI("/_xpack/ml" + jobPathSuffix)
} else {
m.SetServiceURI("/_ml" + jobPathSuffix)
}

content, err := m.HTTP.FetchContent()
if err != nil {
elastic.ReportAndLogError(err, r, m.Log)
Expand Down
Loading

0 comments on commit fd98c11

Please sign in to comment.