Skip to content
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (

"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/tests/resources"
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -393,7 +393,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
func testDefaultHarvesterGroup(t *testing.T, mockHarvester Harvester) *defaultHarvesterGroup {
return &defaultHarvesterGroup{
readers: newReaderGroup(),
pipeline: &pipelinemock.MockPipelineConnector{},
pipeline: &MockPipeline{},
harvester: mockHarvester,
store: testOpenStore(t, "test", nil),
identifier: &sourceIdentifier{"filestream::.global::"},
Expand Down Expand Up @@ -465,3 +465,71 @@ func (tl *testLogger) Errorf(format string, args ...interface{}) {
func (tl *testLogger) String() string {
return (*strings.Builder)(tl).String()
}

// MockClient is a mock implementation of the beat.Client interface.
type MockClient struct {
published []beat.Event // Slice to store published events

closed bool // Flag to indicate if the client is closed
mu sync.Mutex // Mutex to synchronize access to the published events slice
}

// GetEvents returns all the events published by the mock client.
func (m *MockClient) GetEvents() []beat.Event {
m.mu.Lock()
defer m.mu.Unlock()

return m.published
}

// Publish publishes a single event.
func (m *MockClient) Publish(e beat.Event) {
es := make([]beat.Event, 1)
es = append(es, e)

m.PublishAll(es)
}

// PublishAll publishes multiple events.
func (m *MockClient) PublishAll(es []beat.Event) {
m.mu.Lock()
defer m.mu.Unlock()

m.published = append(m.published, es...)
}

// Close closes the mock client.
func (m *MockClient) Close() error {
m.mu.Lock()
defer m.mu.Unlock()

if m.closed {
return fmt.Errorf("mock already closed")
}

m.closed = true
return nil
}

// MockPipeline is a mock implementation of the beat.Pipeline interface.
type MockPipeline struct {
c beat.Client // Client used by the pipeline
mu sync.Mutex // Mutex to synchronize access to the client
}

// ConnectWith connects the mock pipeline with a client using the provided configuration.
func (mp *MockPipeline) ConnectWith(config beat.ClientConfig) (beat.Client, error) {
mp.mu.Lock()
defer mp.mu.Unlock()

c := &MockClient{}

mp.c = c

return c, nil
}

// Connect connects the mock pipeline with a client using the default configuration.
func (mp *MockPipeline) Connect() (beat.Client, error) {
return mp.ConnectWith(beat.ClientConfig{})
}