Skip to content

Commit

Permalink
[receiver/windowseventlogreceiver] Add remote log collection support (#…
Browse files Browse the repository at this point in the history
…33601)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR adds remote log collection for the windows event log receiver. 
A config supports a single server configuration, multiple servers with
multiple credentials configuration and multiple servers with single
credentials configuration.

**Link to tracking Issue:** <Issue number if applicable>
#33100 

**Testing:** <Describe what testing was performed and which tests were
added.>
Added relevant test to the test files that existed.
This was tested on a single remote and with a valid / invalid remote to
ensure successful collection of logs. Gather local logs was also tested
to make sure old functionality is consistent too.

**Documentation:** <Describe the documentation added.>
Updated Read me documentation

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
JonathanWamsley and djaglowski authored Aug 7, 2024
1 parent 6a41efc commit 35fb4a2
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 28 deletions.
28 changes: 28 additions & 0 deletions .chloggen/windowseventlogreceiver-remote-collection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: windowseventlogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add remote collection support to Stanza operator windows pkg to support remote log collect for the Windows Event Log receiver.


# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33100]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
23 changes: 23 additions & 0 deletions pkg/stanza/operator/input/windows/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,17 @@ var (
updateBookmarkProc SyscallProc = api.NewProc("EvtUpdateBookmark")
openPublisherMetadataProc SyscallProc = api.NewProc("EvtOpenPublisherMetadata")
formatMessageProc SyscallProc = api.NewProc("EvtFormatMessage")
openSessionProc SyscallProc = api.NewProc("EvtOpenSession")
)

type EvtRpcLogin struct {
Server *uint16
User *uint16
Domain *uint16
Password *uint16
Flags uint32
}

// SyscallProc is a syscall procedure.
type SyscallProc interface {
Call(...uintptr) (uintptr, uintptr, error)
Expand All @@ -38,6 +47,8 @@ const (
EvtSubscribeStartAtOldestRecord uint32 = 2
// EvtSubscribeStartAfterBookmark is a flag that will subscribe to all events that begin after a bookmark.
EvtSubscribeStartAfterBookmark uint32 = 3
// EvtRpcLoginClass is a flag that indicates the login class.
EvtRpcLoginClass uint32 = 1
)

const (
Expand Down Expand Up @@ -65,6 +76,8 @@ const (
EvtRenderBookmark uint32 = 2
)

var evtSubscribeFunc = evtSubscribe

// evtSubscribe is the direct syscall implementation of EvtSubscribe (https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtsubscribe)
func evtSubscribe(session uintptr, signalEvent windows.Handle, channelPath *uint16, query *uint16, bookmark uintptr, context uintptr, callback uintptr, flags uint32) (uintptr, error) {
handle, _, err := subscribeProc.Call(session, uintptr(signalEvent), uintptr(unsafe.Pointer(channelPath)), uintptr(unsafe.Pointer(query)), bookmark, context, callback, uintptr(flags))
Expand Down Expand Up @@ -147,3 +160,13 @@ func evtFormatMessage(publisherMetadata uintptr, event uintptr, messageID uint32

return bufferUsed, nil
}

// evtOpenSession is the direct syscall implementation of EvtOpenSession (https://learn.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtopensession)
func evtOpenSession(loginClass uint32, login *EvtRpcLogin, timeout uint32, flags uint32) (windows.Handle, error) {
r0, _, e1 := openSessionProc.Call(uintptr(loginClass), uintptr(unsafe.Pointer(login)), uintptr(timeout), uintptr(flags))
handle := windows.Handle(r0)
if handle == 0 {
return handle, e1
}
return handle, nil
}
9 changes: 9 additions & 0 deletions pkg/stanza/operator/input/windows/config_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,13 @@ type Config struct {
PollInterval time.Duration `mapstructure:"poll_interval,omitempty"`
Raw bool `mapstructure:"raw,omitempty"`
ExcludeProviders []string `mapstructure:"exclude_providers,omitempty"`
Remote RemoteConfig `mapstructure:"remote,omitempty"`
}

// RemoteConfig is the configuration for a remote server.
type RemoteConfig struct {
Server string `mapstructure:"server"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Domain string `mapstructure:"domain,omitempty"`
}
13 changes: 11 additions & 2 deletions pkg/stanza/operator/input/windows/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
return nil, fmt.Errorf("the `start_at` field must be set to `beginning` or `end`")
}

return &Input{
if (c.Remote.Server != "" || c.Remote.Username != "" || c.Remote.Password != "") && // any not empty
(c.Remote.Server == "" || c.Remote.Username == "" || c.Remote.Password == "") { // any empty
return nil, fmt.Errorf("remote configuration must have non-empty `username` and `password`")
}

input := &Input{
InputOperator: inputOperator,
buffer: NewBuffer(),
channel: c.Channel,
Expand All @@ -45,5 +50,9 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
pollInterval: c.PollInterval,
raw: c.Raw,
excludeProviders: c.ExcludeProviders,
}, nil
remote: c.Remote,
}
input.startRemoteSession = input.defaultStartRemoteSession

return input, nil
}
138 changes: 118 additions & 20 deletions pkg/stanza/operator/input/windows/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ package windows // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"golang.org/x/sys/windows"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -20,19 +23,79 @@ import (
// Input is an operator that creates entries using the windows event log api.
type Input struct {
helper.InputOperator
bookmark Bookmark
subscription Subscription
buffer Buffer
channel string
maxReads int
startAt string
raw bool
excludeProviders []string
pollInterval time.Duration
persister operator.Persister
publisherCache publisherCache
cancel context.CancelFunc
wg sync.WaitGroup
bookmark Bookmark
buffer Buffer
channel string
maxReads int
startAt string
raw bool
excludeProviders []string
pollInterval time.Duration
persister operator.Persister
publisherCache publisherCache
cancel context.CancelFunc
wg sync.WaitGroup
subscription Subscription
remote RemoteConfig
remoteSessionHandle windows.Handle
startRemoteSession func() error
}

// newInput creates a new Input operator.
func newInput(settings component.TelemetrySettings) *Input {
basicConfig := helper.NewBasicConfig("windowseventlog", "input")
basicOperator, _ := basicConfig.Build(settings)

input := &Input{
InputOperator: helper.InputOperator{
WriterOperator: helper.WriterOperator{
BasicOperator: basicOperator,
},
},
}
input.startRemoteSession = input.defaultStartRemoteSession
return input
}

// defaultStartRemoteSession starts a remote session for reading event logs from a remote server.
func (i *Input) defaultStartRemoteSession() error {
if i.remote.Server == "" {
return nil
}

login := EvtRpcLogin{
Server: windows.StringToUTF16Ptr(i.remote.Server),
User: windows.StringToUTF16Ptr(i.remote.Username),
Password: windows.StringToUTF16Ptr(i.remote.Password),
}

sessionHandle, err := evtOpenSession(EvtRpcLoginClass, &login, 0, 0)
if err != nil {
return fmt.Errorf("failed to open session for server %s: %w", i.remote.Server, err)
}
i.remoteSessionHandle = sessionHandle
return nil
}

// stopRemoteSession stops the remote session if it is active.
func (i *Input) stopRemoteSession() error {
if i.remoteSessionHandle != 0 {
if err := evtClose(uintptr(i.remoteSessionHandle)); err != nil {
return fmt.Errorf("failed to close remote session handle for server %s: %w", i.remote.Server, err)
}
i.remoteSessionHandle = 0
}
return nil
}

// isRemote checks if the input is configured for remote access.
func (i *Input) isRemote() bool {
return i.remote.Server != ""
}

// isNonTransientError checks if the error is likely non-transient.
func isNonTransientError(err error) bool {
return errors.Is(err, windows.ERROR_EVT_CHANNEL_NOT_FOUND) || errors.Is(err, windows.ERROR_ACCESS_DENIED)
}

// Start will start reading events from a subscription.
Expand All @@ -42,10 +105,15 @@ func (i *Input) Start(persister operator.Persister) error {

i.persister = persister

if i.isRemote() {
if err := i.startRemoteSession(); err != nil {
return fmt.Errorf("failed to start remote session for server %s: %w", i.remote.Server, err)
}
}

i.bookmark = NewBookmark()
offsetXML, err := i.getBookmarkOffset(ctx)
if err != nil {
i.Logger().Error("Failed to open bookmark, continuing without previous bookmark", zap.Error(err))
_ = i.persister.Delete(ctx, i.channel)
}

Expand All @@ -55,15 +123,31 @@ func (i *Input) Start(persister operator.Persister) error {
}
}

i.subscription = NewSubscription()
if err := i.subscription.Open(i.channel, i.startAt, i.bookmark); err != nil {
return fmt.Errorf("failed to open subscription: %w", err)
i.publisherCache = newPublisherCache()

subscription := NewLocalSubscription()
if i.isRemote() {
subscription = NewRemoteSubscription(i.remote.Server)
}

i.publisherCache = newPublisherCache()
if err := subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
if isNonTransientError(err) {
if i.isRemote() {
return fmt.Errorf("failed to open subscription for remote server %s: %w", i.remote.Server, err)
}
return fmt.Errorf("failed to open local subscription: %w", err)
}
if i.isRemote() {
i.Logger().Warn("Transient error opening subscription for remote server, continuing", zap.String("server", i.remote.Server), zap.Error(err))
} else {
i.Logger().Warn("Transient error opening local subscription, continuing", zap.Error(err))
}
}

i.subscription = subscription
i.wg.Add(1)
go i.readOnInterval(ctx)

return nil
}

Expand All @@ -84,7 +168,7 @@ func (i *Input) Stop() error {
return fmt.Errorf("failed to close publishers: %w", err)
}

return nil
return i.stopRemoteSession()
}

// readOnInterval will read events with respect to the polling interval.
Expand Down Expand Up @@ -112,6 +196,15 @@ func (i *Input) readToEnd(ctx context.Context) {
return
default:
if count := i.read(ctx); count == 0 {
if i.isRemote() {
if err := i.startRemoteSession(); err != nil {
i.Logger().Error("Failed to re-establish remote session", zap.String("server", i.remote.Server), zap.Error(err))
return
}
if err := i.subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
i.Logger().Error("Failed to re-open subscription for remote server", zap.String("server", i.remote.Server), zap.Error(err))
}
}
return
}
}
Expand Down Expand Up @@ -139,6 +232,8 @@ func (i *Input) read(ctx context.Context) int {

// processEvent will process and send an event retrieved from windows event log.
func (i *Input) processEvent(ctx context.Context, event Event) {
remoteServer := i.remote.Server

if i.raw {
if len(i.excludeProviders) > 0 {
simpleEvent, err := event.RenderSimple(i.buffer)
Expand All @@ -159,6 +254,7 @@ func (i *Input) processEvent(ctx context.Context, event Event) {
i.Logger().Error("Failed to render raw event", zap.Error(err))
return
}
rawEvent.RemoteServer = remoteServer
i.sendEventRaw(ctx, rawEvent)
return
}
Expand All @@ -167,6 +263,7 @@ func (i *Input) processEvent(ctx context.Context, event Event) {
i.Logger().Error("Failed to render simple event", zap.Error(err))
return
}
simpleEvent.RemoteServer = remoteServer

for _, excludeProvider := range i.excludeProviders {
if simpleEvent.Provider.Name == excludeProvider {
Expand All @@ -192,7 +289,7 @@ func (i *Input) processEvent(ctx context.Context, event Event) {
i.sendEvent(ctx, simpleEvent)
return
}

formattedEvent.RemoteServer = remoteServer
i.sendEvent(ctx, formattedEvent)
}

Expand All @@ -210,6 +307,7 @@ func (i *Input) sendEvent(ctx context.Context, eventXML EventXML) {
i.Write(ctx, entry)
}

// sendEventRaw will send EventRaw as an entry to the operator's output.
func (i *Input) sendEventRaw(ctx context.Context, eventRaw EventRaw) {
body := eventRaw.parseBody()
entry, err := i.NewEntry(body)
Expand Down
Loading

0 comments on commit 35fb4a2

Please sign in to comment.