Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730]
- Fix replace processor handling of zero string replacement validation. {pull}40751[40751]
- Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]


*Heartbeat*
Expand Down
100 changes: 64 additions & 36 deletions x-pack/filebeat/input/salesforce/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (s *salesforceInput) Setup(env v2.Context, src inputcursor.Source, cursor *
// and based on the configuration, it will run the different methods -- EventLogFile
// or Object to collect events at defined intervals.
func (s *salesforceInput) run() error {
s.log.Info("Starting Salesforce input run")
if s.srcConfig.EventMonitoringMethod.EventLogFile.isEnabled() {
err := s.RunEventLogFile()
if err != nil {
Expand Down Expand Up @@ -160,12 +161,18 @@ func (s *salesforceInput) run() error {
case <-s.ctx.Done():
return s.isError(s.ctx.Err())
case <-eventLogFileTicker.C:
s.log.Info("Running EventLogFile collection")
if err := s.RunEventLogFile(); err != nil {
s.log.Errorf("Problem running EventLogFile collection: %s", err)
} else {
s.log.Info("EventLogFile collection completed successfully")
}
case <-objectMethodTicker.C:
s.log.Info("Running Object collection")
if err := s.RunObject(); err != nil {
s.log.Errorf("Problem running Object collection: %s", err)
} else {
s.log.Info("Object collection completed successfully")
}
}
}
Expand All @@ -181,15 +188,17 @@ func (s *salesforceInput) isError(err error) error {
}

func (s *salesforceInput) SetupSFClientConnection() (*soql.Resource, error) {
s.log.Info("Setting up Salesforce client connection")
if s.sfdcConfig == nil {
return nil, errors.New("internal error: salesforce configuration is not set properly")
}

// Open creates a session using the configuration.
session, err := session.Open(*s.sfdcConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to open salesforce connection: %w", err)
}
s.log.Info("Salesforce session opened successfully")

// Set clientSession for re-use.
s.clientSession = session
Expand All @@ -209,8 +218,6 @@ func (s *salesforceInput) FormQueryWithCursor(queryConfig *QueryConfig, cursor m
return nil, err
}

s.log.Infof("Salesforce query: %s", qr)

return &querier{Query: qr}, err
}

Expand All @@ -222,7 +229,7 @@ func isZero[T comparable](v T) bool {

// RunObject runs the Object method of the Event Monitoring API to collect events.
func (s *salesforceInput) RunObject() error {
s.log.Debugf("scrape object(s) every %s", s.srcConfig.EventMonitoringMethod.Object.Interval)
s.log.Infof("Running Object collection with interval: %s", s.srcConfig.EventMonitoringMethod.Object.Interval)

var cursor mapstr.M
if !(isZero(s.cursor.Object.FirstEventTime) && isZero(s.cursor.Object.LastEventTime)) {
Expand All @@ -241,6 +248,8 @@ func (s *salesforceInput) RunObject() error {
return fmt.Errorf("error forming query based on cursor: %w", err)
}

s.log.Infof("Query formed: %s", query.Query)

res, err := s.soqlr.Query(query, false)
if err != nil {
return err
Expand Down Expand Up @@ -282,15 +291,15 @@ func (s *salesforceInput) RunObject() error {
return err
}
}
s.log.Debugf("Total events: %d", totalEvents)
s.log.Infof("Total events: %d", totalEvents)

return nil
}

// RunEventLogFile runs the EventLogFile method of the Event Monitoring API to
// collect events.
func (s *salesforceInput) RunEventLogFile() error {
s.log.Debugf("scrape eventLogFile(s) every %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)
s.log.Infof("Running EventLogFile collection with interval: %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)

var cursor mapstr.M
if !(isZero(s.cursor.EventLogFile.FirstEventTime) && isZero(s.cursor.EventLogFile.LastEventTime)) {
Expand All @@ -309,6 +318,8 @@ func (s *salesforceInput) RunEventLogFile() error {
return fmt.Errorf("error forming query based on cursor: %w", err)
}

s.log.Infof("Query formed: %s", query.Query)

res, err := s.soqlr.Query(query, false)
if err != nil {
return err
Expand All @@ -324,9 +335,14 @@ func (s *salesforceInput) RunEventLogFile() error {
totalEvents, firstEvent := 0, true
for res.TotalSize() > 0 {
for _, rec := range res.Records() {
req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+rec.Record().Fields()["LogFile"].(string), nil)
logfile, ok := rec.Record().Fields()["LogFile"].(string)
if !ok {
return fmt.Errorf("LogFile field not found or not a string in Salesforce event log file: %v", rec.Record().Fields())
}

req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+logfile, nil)
if err != nil {
return err
return fmt.Errorf("error creating request for log file: %w", err)
}

s.clientSession.AuthorizationHeader(req)
Expand All @@ -341,19 +357,23 @@ func (s *salesforceInput) RunEventLogFile() error {

resp, err := s.sfdcConfig.Client.Do(req)
if err != nil {
return err
return fmt.Errorf("error fetching log file: %w", err)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return err
return fmt.Errorf("unexpected status code %d for log file", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return fmt.Errorf("error reading log file body: %w", err)
}

recs, err := decodeAsCSV(body)
recs, err := s.decodeAsCSV(body)
if err != nil {
return err
return fmt.Errorf("error decoding CSV: %w", err)
}

if timestamp, ok := rec.Record().Fields()[s.config.EventMonitoringMethod.EventLogFile.Cursor.Field].(string); ok {
Expand All @@ -366,12 +386,11 @@ func (s *salesforceInput) RunEventLogFile() error {
for _, val := range recs {
jsonStrEvent, err := json.Marshal(val)
if err != nil {
return err
return fmt.Errorf("error json marshaling event: %w", err)
}

err = publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile")
if err != nil {
return err
if err := publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile"); err != nil {
return fmt.Errorf("error publishing event: %w", err)
}
totalEvents++
}
Expand All @@ -384,10 +403,10 @@ func (s *salesforceInput) RunEventLogFile() error {

res, err = res.Next()
if err != nil {
return err
return fmt.Errorf("error getting next page: %w", err)
}
}
s.log.Debugf("Total events: %d", totalEvents)
s.log.Infof("Total events processed: %d", totalEvents)

return nil
}
Expand All @@ -405,6 +424,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error

switch {
case cfg.Auth.OAuth2.JWTBearerFlow != nil && cfg.Auth.OAuth2.JWTBearerFlow.isEnabled():
s.log.Info("Using JWT Bearer Flow for authentication")
pemBytes, err := os.ReadFile(cfg.Auth.OAuth2.JWTBearerFlow.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("problem with client key path for JWT auth: %w", err)
Expand All @@ -428,6 +448,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error
}

case cfg.Auth.OAuth2.UserPasswordFlow != nil && cfg.Auth.OAuth2.UserPasswordFlow.isEnabled():
s.log.Info("Using User Password Flow for authentication")
passCreds := credentials.PasswordCredentials{
URL: cfg.Auth.OAuth2.UserPasswordFlow.TokenURL,
Username: cfg.Auth.OAuth2.UserPasswordFlow.Username,
Expand Down Expand Up @@ -533,21 +554,29 @@ type textContextError struct {
body []byte
}

// decodeAsCSV decodes p as a headed CSV document into dst.
func decodeAsCSV(p []byte) ([]map[string]string, error) {
// decodeAsCSV decodes the provided byte slice as a CSV and returns a slice of
// maps, where each map represents a row in the CSV with the header fields as
// keys and the row values as values.
func (s *salesforceInput) decodeAsCSV(p []byte) ([]map[string]string, error) {
r := csv.NewReader(bytes.NewReader(p))

// To share the backing array for performance.
r.ReuseRecord = true

// Lazy quotes are enabled to allow for quoted fields with commas. More flexible
// in handling CSVs.
// NOTE(shmsr): Although, we didn't face any issue with LazyQuotes == false, but I
// think we should keep it enabled to avoid any issues in the future.
r.LazyQuotes = true

// Header row is always expected, otherwise we can't map values to keys in
// the event.
header, err := r.Read()
if err != nil {
if err == io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
return nil, fmt.Errorf("failed to read CSV header: %w", err)
}

// As buffer reuse is enabled, copying header is important.
Expand All @@ -561,22 +590,21 @@ func decodeAsCSV(p []byte) ([]map[string]string, error) {
// so that future records must have the same field count.
// So, if len(header) != len(event), the Read will return an error and hence
// we need not put an explicit check.
event, err := r.Read()
for ; err == nil; event, err = r.Read() {
for {
record, err := r.Read()
if err != nil {
continue
}
o := make(map[string]string, len(header))
for i, h := range header {
o[h] = event[i]
if errors.Is(err, io.EOF) {
break
}
s.log.Errorf("failed to read CSV record: %v\n%s", err, p)
return nil, textContextError{error: fmt.Errorf("failed to read CSV record: %w for: %v", err, record), body: p}
}
results = append(results, o)
}

if err != nil {
if err != io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
return nil, textContextError{error: err, body: p}
event := make(map[string]string, len(header))
for i, h := range header {
event[h] = record[i]
}
results = append(results, event)
}

return results, nil
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/salesforce/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ func TestDecodeAsCSV(t *testing.T) {
"Login","20231218054831.655","4u6LyuMrDvb_G-l1cJIQk-","00D5j00000DgAYG","0055j00000AT6I1","1219","127","/services/oauth2/token","","bY5Wfv8t/Ith7WVE","Standard","","1051271151","i","Go-http-client/1.1","","9998.0","salesforceinstance@devtest.in","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:31.655Z","0055j00000AT6I1AAL","Salesforce.com IP","","LOGIN_NO_ERROR","103.108.207.58"
"Login","20231218054832.003","4u6LyuHSDv8LLVl1cJOqGV","00D5j00000DgAYG","0055j00000AT6I1","1277","104","/services/oauth2/token","","u60el7VqW8CSSKcW","Standard","","674857427","i","Go-http-client/1.1","","9998.0","salesforceinstance@devtest.in","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:32.003Z","0055j00000AT6I1AAL","103.108.207.58","","LOGIN_NO_ERROR","103.108.207.58"`

mp, err := decodeAsCSV([]byte(sampleELF))
s := &salesforceInput{log: logp.NewLogger("salesforceInput")}

mp, err := s.decodeAsCSV([]byte(sampleELF))
assert.NoError(t, err)

wantNumOfEvents := 2
Expand Down