Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"os"
"runtime"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -736,3 +737,169 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
cancelInput()
env.waitUntilInputStops()
}

// test_symlinks_enabled from test_harvester.py
func TestFilestreamSymlinksEnabled(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.symlinks": "true",
})

testlines := []byte("first line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines))
}

// test_symlink_rotated from test_harvester.py
func TestFilestreamSymlinkRotated(t *testing.T) {
env := newInputTestingEnvironment(t)

firstTestlogName := "test1.log"
secondTestlogName := "test2.log"
symlinkName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
})

commonLine := "first line in file "
for i, path := range []string{firstTestlogName, secondTestlogName} {
env.mustWriteLinesToFile(path, []byte(commonLine+strconv.Itoa(i)+"\n"))
}

env.mustSymlink(firstTestlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

expectedOffset := len(commonLine) + 2
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)

// rotate symlink
env.mustRemoveFile(symlinkName)
env.mustSymlink(secondTestlogName, symlinkName)

moreLines := "second line in file 2\nthird line in file 2\n"
env.mustAppendLinesToFile(secondTestlogName, []byte(moreLines))

env.waitUntilEventCount(4)
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)
env.requireOffsetInRegistry(secondTestlogName, expectedOffset+len(moreLines))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(2)
}

// test_symlink_removed from test_harvester.py
func TestFilestreamSymlinkRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
})

line := []byte("first line\n")
env.mustWriteLinesToFile(testlogName, line)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

env.requireOffsetInRegistry(testlogName, len(line))

// remove symlink
env.mustRemoveFile(symlinkName)

env.mustAppendLinesToFile(testlogName, line)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, 2*len(line))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

// test_truncate from test_harvester.py
func TestFilestreamTruncate(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath("*"),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
})

lines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, lines)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)

env.requireOffsetInRegistry(testlogName, len(lines))

// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, 0)

// recreate symlink
env.mustSymlink(testlogName, symlinkName)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteLinesToFile(testlogName, moreLines)

env.waitUntilEventCount(5)
env.requireOffsetInRegistry(testlogName, len(moreLines))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}