Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions auditbeat/tests/system/auditbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ class BaseTest(MetricbeatTest):
@classmethod
def setUpClass(self):
self.beat_name = "auditbeat"
self.beat_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../"))

if not hasattr(self, 'beat_path'):
self.beat_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../"))

super(MetricbeatTest, self).setUpClass()

def create_file(self, path, contents):
Expand Down
34 changes: 26 additions & 8 deletions libbeat/generator/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,27 @@ type YmlFile struct {
Indent int
}

// NewYmlFile performs some checks and then creates and returns a YmlFile struct
func NewYmlFile(path string, indent int) (*YmlFile, error) {
_, err := os.Stat(path)

if os.IsNotExist(err) {
// skip
return nil, nil
}

if err != nil {
// return error
return nil, err
}

// All good, return file
return &YmlFile{
Path: path,
Indent: indent,
}, nil
}

func collectCommonFiles(esBeatsPath, beatPath string, fieldFiles []*YmlFile) ([]*YmlFile, error) {
commonFields := []string{
filepath.Join(beatPath, "_meta/fields.common.yml"),
Expand All @@ -52,16 +73,13 @@ func collectCommonFiles(esBeatsPath, beatPath string, fieldFiles []*YmlFile) ([]

var files []*YmlFile
for _, cf := range commonFields {
_, err := os.Stat(cf)
if os.IsNotExist(err) {
continue
} else if err != nil {
ymlFile, err := NewYmlFile(cf, 0)

if err != nil {
return nil, err
} else if ymlFile != nil {
files = append(files, ymlFile)
}
files = append(files, &YmlFile{
Path: cf,
Indent: 0,
})
}

files = append(files, libbeatFieldFiles...)
Expand Down
26 changes: 12 additions & 14 deletions libbeat/generator/fields/module_fields_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package fields

import (
"io/ioutil"
"os"
"path/filepath"
)

Expand Down Expand Up @@ -69,16 +68,15 @@ func CollectModuleFiles(modulesDir string) ([]*YmlFile, error) {

// CollectFiles collects all files for the given module including filesets
func CollectFiles(module string, modulesPath string) ([]*YmlFile, error) {

var files []*YmlFile

fieldsYmlPath := filepath.Join(modulesPath, module, "_meta/fields.yml")
if _, err := os.Stat(fieldsYmlPath); !os.IsNotExist(err) {
files = append(files, &YmlFile{
Path: fieldsYmlPath,
Indent: 0,
})
} else if !os.IsNotExist(err) && err != nil {
ymlFile, err := NewYmlFile(fieldsYmlPath, 0)

if err != nil {
return nil, err
} else if ymlFile != nil {
files = append(files, ymlFile)
}

modulesRoot := filepath.Base(modulesPath)
Expand All @@ -91,14 +89,14 @@ func CollectFiles(module string, modulesPath string) ([]*YmlFile, error) {
if !s.IsDir() {
continue
}

fieldsYmlPath = filepath.Join(modulesPath, module, s.Name(), "_meta/fields.yml")
if _, err = os.Stat(fieldsYmlPath); !os.IsNotExist(err) {
files = append(files, &YmlFile{
Path: fieldsYmlPath,
Indent: indentByModule[modulesRoot],
})
} else if !os.IsNotExist(err) && err != nil {
ymlFile, err := NewYmlFile(fieldsYmlPath, indentByModule[modulesRoot])

if err != nil {
return nil, err
} else if ymlFile != nil {
files = append(files, ymlFile)
}
}
return files, nil
Expand Down
4 changes: 2 additions & 2 deletions libbeat/scripts/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ PIP_INSTALL_PARAMS?=
BUILDID?=$(shell git rev-parse HEAD) ## @Building The build ID
VIRTUALENV_PARAMS?=
INTEGRATION_TESTS?=
FIND=. ${PYTHON_ENV}/bin/activate; find . -type f -not -path "*/vendor/*" -not -path "*/build/*" -not -path "*/.git/*"
FIND?=. ${PYTHON_ENV}/bin/activate; find . -type f -not -path "*/vendor/*" -not -path "*/build/*" -not -path "*/.git/*"
PERM_EXEC?=$(shell [ `uname -s` = "Darwin" ] && echo "+111" || echo "/a+x")

ifeq ($(DOCKER_CACHE),0)
Expand Down Expand Up @@ -201,7 +201,7 @@ integration-tests-environment: prepare-tests build-image
system-tests: ## @testing Runs the system tests
system-tests: prepare-tests ${BEAT_NAME}.test python-env
. ${PYTHON_ENV}/bin/activate; INTEGRATION_TESTS=${INTEGRATION_TESTS} TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} nosetests ${PYTHON_TEST_FILES} ${NOSETESTS_OPTIONS}
python ${ES_BEATS}/dev-tools/aggregate_coverage.py -o ${COVERAGE_DIR}/system.cov ./build/system-tests/run
python ${ES_BEATS}/dev-tools/aggregate_coverage.py -o ${COVERAGE_DIR}/system.cov ${BUILD_DIR}/system-tests/run

# Runs the system tests
.PHONY: system-tests-environment
Expand Down
29 changes: 22 additions & 7 deletions libbeat/scripts/cmd/global_fields/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/elastic/beats/libbeat/generator/fields"
)
Expand Down Expand Up @@ -78,15 +79,29 @@ func main() {

var fieldsFiles []*fields.YmlFile
for _, fieldsFilePath := range beatFieldsPaths {
pathToModules := filepath.Join(beatPath, fieldsFilePath)
fullPath := filepath.Join(beatPath, fieldsFilePath)

fieldsFile, err := fields.CollectModuleFiles(pathToModules)
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot collect fields.yml files: %+v\n", err)
os.Exit(2)
}
isFile := strings.HasSuffix(fullPath, ".yml")
if isFile {
file, err := fields.NewYmlFile(fullPath, 0)

if err != nil || file == nil {
fmt.Fprintf(os.Stderr, "Cannot collect fields.yml file: %+v\n", err)
os.Exit(2)
}

fieldsFiles = append(fieldsFiles, file)
} else {
// fullPath is a directory
files, err := fields.CollectModuleFiles(fullPath)

fieldsFiles = append(fieldsFiles, fieldsFile...)
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot collect fields.yml files: %+v\n", err)
os.Exit(2)
}

fieldsFiles = append(fieldsFiles, files...)
}
}

err = fields.Generate(esBeatsPath, beatPath, fieldsFiles, output)
Expand Down
8 changes: 6 additions & 2 deletions metricbeat/tests/system/metricbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ class BaseTest(TestCase):

@classmethod
def setUpClass(self):
self.beat_name = "metricbeat"
self.beat_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
if not hasattr(self, 'beat_name'):
self.beat_name = "metricbeat"

if not hasattr(self, 'beat_path'):
self.beat_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: why is this metricbeat test being modified? libbeat/tests/system/beat/beat.py doesn't appear to have been modified in this PR...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Auditbeat with X-Pack is in its own xpack/auditbeat directory, beat_path needs to point to that. It was being overwritten since AuditbeatXPackTest in auditbeat_xpack.py ultimately extends BaseTest from metricbeat.py.

super(BaseTest, self).setUpClass()

def de_dot(self, existing_fields):
Expand Down
4 changes: 4 additions & 0 deletions x-pack/auditbeat/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/auditbeat
/auditbeat.test
/data
/fields.yml
28 changes: 28 additions & 0 deletions x-pack/auditbeat/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
BEAT_NAME=auditbeat
ES_BEATS=../..
XPACK_BEAT_PATH?=github.com/elastic/beats/x-pack/${BEAT_NAME}
GOPACKAGES=$(shell go list ${BEAT_PATH}/... ${XPACK_BEAT_PATH}/... | grep -v /vendor/ | grep -v /scripts/cmd/ )
FIND=. ${PYTHON_ENV}/bin/activate; find . ${ES_BEATS}/${BEAT_NAME} -type f -not -path "*/vendor/*" -not -path "*/build/*" -not -path "*/.git/*"

# Include main auditbeat Makefile
include ${ES_BEATS}/${BEAT_NAME}/Makefile

# Overwrite check-headers - check for Apache license in
# auditbeat/ and Elastic license in xpack/auditbeat/
.PHONY: check-headers
check-headers:
ifndef CHECK_HEADERS_DISABLED
@go get -u github.com/elastic/go-licenser
@go-licenser -d -license ${LICENSE} ${ES_BEATS}/${BEAT_NAME}
@go-licenser -d -license Elastic .
endif

# Overwrite check-headers - insert Apache license in
# auditbeat/ and Elastic license in xpack/auditbeat/
.PHONY: add-headers
add-headers:
ifndef CHECK_HEADERS_DISABLED
@go get github.com/elastic/go-licenser
@go-licenser -license ${LICENSE} ${ES_BEATS}/${BEAT_NAME}
@go-licenser -license Elastic .
endif
58 changes: 58 additions & 0 deletions x-pack/auditbeat/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cache

// Cache is just a map being used as a cache.
type Cache struct {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the Cache doesn't need a mutex? Is it always called only from a single go-routine? If yes, then it's fine.

@andrewkroh andrewkroh Oct 18, 2018

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetch is always called from the same goroutine. But I don’t see much of a downside to make it thread safe now.

hashMap map[uint64]Cacheable
}

// Cacheable is the interface items stored in Cache need to implement.
type Cacheable interface {
Hash() uint64
}

// New creates a new cache.
func New() *Cache {
return &Cache{
hashMap: make(map[uint64]Cacheable),
}
}

// IsEmpty checks if the cache is empty.
func (cache *Cache) IsEmpty() bool {
return len(cache.hashMap) == 0
}

// DiffAndUpdateCache takes a list of new items to cache, compares them to the current
// cache contents, and returns both items new to the cache and items that are in the cache
// but missing in the new data.
func (cache *Cache) DiffAndUpdateCache(current []Cacheable) (new, missing []interface{}) {
// Check for and delete missing - what is no longer in current that was in the cache
for cacheKey, cacheValue := range cache.hashMap {
found := false
for _, currentValue := range current {
if currentValue.Hash() == cacheKey {
found = true
break
}
}

if !found {
missing = append(missing, cacheValue)
delete(cache.hashMap, cacheKey)
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You appear to be doing an O(n^2) operation here (n before and after is roughly equal).

Since the new state (current) is all you need to keep in memory for the next tick, perhaps you can skip the updating of the existing hash map and save current as is for later. In other words, this isn't really a cache, but more of a "previous state".

Then you can find "new" and "missing" result sets by:

  • looping over all keys from current and searching the cache for each item based on key. Each "not found" is added to your "new" result set
  • looping over all keys from cache and searching the current (which would have to be a map as well, not a slice). Each "not found" is added to your "missing" result set.

Each of these loops is an O(n), and so is the work for building a map out of current.

Note that you can check for presence by assigning to a second, optional var when searching the map:

_, found := cache[key]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current is not a map, so I would need to build a new map and add all current items and their hashes. Doing that every time we check the cache (e.g. every second) seems pretty expensive - most of the time nothing will have changed, or only one or two items. Am I missing something?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to loop back here. I won't push too hard on the O(n^2), since I'm a total Go noob, so I may be missing something in your code or in the Map initialization.

But my computer currently runs 500-ish processes. This means each time it's comparing the new list of processes with the last state, it's doing a multiple of 250 000 operations (500^2) instead of doing a multiple of 500 operations... This may be irrelevant, since typically servers run way less stuff than workstations.

So I just wanted to put that out there, but we can keep things as they are for now. This may be premature optimization.


// Check for new - what is in current but not in cache
for _, currentValue := range current {
if _, contains := cache.hashMap[currentValue.Hash()]; !contains {
new = append(new, currentValue)
cache.hashMap[currentValue.Hash()] = currentValue
}
}

return
}
55 changes: 55 additions & 0 deletions x-pack/auditbeat/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cache

import (
"testing"

"github.com/OneOfOne/xxhash"
"github.com/stretchr/testify/assert"
)

type CacheTestItem struct {
s string
}

func (item CacheTestItem) Hash() uint64 {
h := xxhash.New64()
h.WriteString(item.s)
return h.Sum64()
}

func TestCache(t *testing.T) {
c := New()

assert.True(t, c.IsEmpty())

oldItems := []Cacheable{
CacheTestItem{"item1"},
CacheTestItem{"item2"},
}

newItems := []Cacheable{
CacheTestItem{"item1"},
CacheTestItem{"item3"},
}

new, missing := c.DiffAndUpdateCache(oldItems)

assert.Equal(t, 2, len(new))
assert.Equal(t, 0, len(missing))
assert.False(t, c.IsEmpty())

new, missing = c.DiffAndUpdateCache(newItems)

assert.Equal(t, 1, len(new))
assert.Equal(t, 1, len(missing))

new, missing = c.DiffAndUpdateCache([]Cacheable{})

assert.Equal(t, 0, len(new))
assert.Equal(t, 2, len(missing))
assert.True(t, c.IsEmpty())
}
2 changes: 2 additions & 0 deletions x-pack/auditbeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ import (
// Include all Auditbeat modules so that they register their
// factories with the global registry.
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/host"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/packages"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/processes"
)
Loading