diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 6a2107d8eef8..a8f92f84c128 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -366,6 +366,15 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { return err } + // Try to acquire exclusive lock on data path to prevent another beat instance + // sharing same data path. + bl := newLocker(b) + err = bl.lock() + if err != nil { + return err + } + defer bl.unlock() + // Set Beat ID in registry vars, in case it was loaded from meta file infoRegistry := monitoring.GetNamespace("info").GetRegistry() monitoring.NewString(infoRegistry, "uuid").Set(b.Info.ID.String()) diff --git a/libbeat/cmd/instance/locker.go b/libbeat/cmd/instance/locker.go new file mode 100644 index 000000000000..da91d97bc68b --- /dev/null +++ b/libbeat/cmd/instance/locker.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package instance + +import ( + "os" + + "github.com/pkg/errors" + flock "github.com/theckman/go-flock" + + "github.com/elastic/beats/libbeat/paths" +) + +var ( + // ErrAlreadyLocked is returned when a lock on the data path is attempted but + // unsuccessful because another Beat instance already has the lock on the same + // data path. + ErrAlreadyLocked = errors.New("data path already locked by another beat") +) + +type locker struct { + fl *flock.Flock +} + +func newLocker(b *Beat) *locker { + lockfilePath := paths.Resolve(paths.Data, b.Info.Beat+".lock") + return &locker{ + fl: flock.NewFlock(lockfilePath), + } +} + +// lock attemps to acquire a lock on the data path for the currently-running +// Beat instance. If another Beats instance already has a lock on the same data path +// an ErrAlreadyLocked error is returned. +func (l *locker) lock() error { + isLocked, err := l.fl.TryLock() + if err != nil { + return errors.Wrap(err, "unable to lock data path") + } + + if !isLocked { + return ErrAlreadyLocked + } + + return nil +} + +// unlock attempts to release the lock on a data path previously acquired via Lock(). +func (l *locker) unlock() error { + err := l.fl.Unlock() + if err != nil { + return errors.Wrap(err, "unable to unlock data path") + } + + err = os.Remove(l.fl.Path()) + if err != nil { + return errors.Wrap(err, "unable to unlock data path") + } + + return nil +} diff --git a/libbeat/cmd/instance/locker_test.go b/libbeat/cmd/instance/locker_test.go new file mode 100644 index 000000000000..e6feab0e7fed --- /dev/null +++ b/libbeat/cmd/instance/locker_test.go @@ -0,0 +1,67 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package instance + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/paths" +) + +// TestLocker tests that two beats pointing to the same data path cannot +// acquire the same lock. +func TestLocker(t *testing.T) { + // Setup temporary data folder for test + clean it up at end of test + tmpDataDir, err := ioutil.TempDir("", "data") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDataDir) + + origDataPath := paths.Paths.Data + defer func() { + paths.Paths.Data = origDataPath + }() + paths.Paths.Data = tmpDataDir + + // Setup two beats with same name and data path + const beatName = "testbeat" + + b1 := &Beat{} + b1.Info.Beat = beatName + + b2 := &Beat{} + b2.Info.Beat = beatName + + // Try to get a lock for the first beat. Expect it to succeed. + bl1 := newLocker(b1) + err = bl1.lock() + assert.NoError(t, err) + + // Try to get a lock for the second beat. Expect it to fail because the + // first beat already has the lock. + bl2 := newLocker(b2) + err = bl2.lock() + assert.EqualError(t, err, ErrAlreadyLocked.Error()) +}