Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) {
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed")
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}

s3Infos = append(s3Infos, s3Info{
Expand Down Expand Up @@ -419,17 +419,17 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
// If the SDK can determine the request or retry delay was canceled
// by a context the ErrCodeRequestCanceled error will be returned.
if awsErr.Code() == awssdk.ErrCodeRequestCanceled {
err = errors.Wrap(err, "S3 GetObjectRequest canceled")
err = errors.Wrapf(err, "S3 GetObjectRequest canceled for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}

if awsErr.Code() == "NoSuchKey" {
p.logger.Warn("Cannot find s3 file")
p.logger.Warnf("Cannot find s3 file '%s' from S3 bucket '%s'", info.key, info.name)
return nil
}
}
return errors.Wrap(err, "S3 GetObjectRequest failed")
return errors.Wrapf(err, "S3 GetObjectRequest failed for '%s' from S3 bucket '%s'", info.key, info.name)
}

defer resp.Body.Close()
Expand All @@ -448,7 +448,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
decoder := json.NewDecoder(reader)
err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx)
if err != nil {
err = errors.Wrap(err, "decodeJSONWithKey failed")
err = errors.Wrapf(err, "decodeJSONWithKey failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
Expand All @@ -459,7 +459,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
err = errors.Wrap(err, "gzip.NewReader failed")
err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
Expand Down Expand Up @@ -519,36 +519,37 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3
// get logs from expand_event_list_from_field
textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, fmt.Sprintf("key '%s' not found", p.config.ExpandEventListFromField))
err = errors.Wrapf(err, "key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrap(err, "convertJSONToEvent failed")
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return err
}
}
} else if err != nil {
// decode json failed, skip this log file
p.logger.Warnf(fmt.Sprintf("Decode json failed for '%s', skipping this file: %s", s3Info.key, err))
err = errors.Wrapf(err, "decode json failed for '%s' from S3 bucket '%s', skipping this file", s3Info.key, s3Info.name)
p.logger.Warn(err)
return nil
}

textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField))
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField))
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
}
Expand All @@ -564,7 +565,7 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH

err = p.forwardEvent(event)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("forwardEvent failed"))
err = errors.Wrap(err, "forwardEvent failed")
p.logger.Error(err)
return err
}
Expand Down Expand Up @@ -596,7 +597,7 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled {
return nil
}
return errors.Wrap(err, "SQS DeleteMessageRequest failed")
return errors.Wrapf(err, "SQS DeleteMessageRequest failed in queue %s", queueURL)
}
return nil
}
Expand Down