Skip to content

Commit

Permalink
[CR] Fail "lakefs expire" if errors occur during expiration
Browse files Browse the repository at this point in the history
Tested by using IAM to fail CreateJob and seeing an appropriately
FATAL report.


Former-commit-id: a1e4ca93d287e7dc16fabad3134e31f3ae90cab8
  • Loading branch information
arielshaqed committed Jul 30, 2020
1 parent dd78fbc commit 3ac0877
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 87 deletions.
23 changes: 20 additions & 3 deletions cmd/lakefs/cmd/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var expireCmd = &cobra.Command{
// it is easier to understand separated logs, and safer to expire one repository
// at a time.
numFailures := 0
ok := true
for _, repo := range repos {
repoLogger := logger.WithFields(logging.Fields{
"repository": repo.Name,
Expand All @@ -75,26 +76,42 @@ var expireCmd = &cobra.Command{
policy, err := retentionService.GetPolicy(repo.Name)
if err != nil {
repoLogger.WithError(err).Error("failed to get retention policy (skip repo)")
numFailures++
continue
}
if policy == nil {
repoLogger.Info("no retention policy for this repository - skip")
// (not a failure)
continue
}
expiryRows, err := cataloger.QueryExpired(ctx, repo.Name, &policy.Policy)
if err != nil {
repoLogger.WithError(err).Error("failed to query for expired (skip repo)")
numFailures++
continue
}
expiryReader, err := retention.WriteExpiryResultsToSeekableReader(ctx, expiryRows)
if err != nil {
repoLogger.WithError(err).Error("failed to write expiry results (skip repo)")
numFailures++
continue
}

retention.ExpireOnS3(ctx, s3ControlClient, s3Client, cataloger, expiryReader, &expiryParams)
errCh := retention.ExpireOnS3(ctx, s3ControlClient, s3Client, cataloger, expiryReader, &expiryParams)

repoOk := true
for err := range errCh {
repoOk = false
repoLogger.Error(err)
}
if !repoOk {
numFailures++
ok = false
continue
}
}
if numFailures > 0 {
logger.Fatalf("Configuration issues found in %d repositories", numFailures)
if numFailures > 0 || !ok {
logger.Fatalf("Failed to expire on %d repositories; errors emitted above", numFailures)
}
},
Hidden: true,
Expand Down
179 changes: 96 additions & 83 deletions retention/expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,96 +252,109 @@ type ExpireOnS3Params struct {
ManifestUrlForBucket func(string) string
}

func ExpireOnS3(ctx context.Context, s3ControlClient s3controliface.S3ControlAPI, s3Client s3iface.S3API, c catalog.Cataloger, expiryResultsReader fileutil.RewindableReader, params *ExpireOnS3Params) {
// ExpireOnS3 starts a goroutine to expire all entries on expiryResultsReader and returns a
// channel that will receive all error results.
func ExpireOnS3(ctx context.Context, s3ControlClient s3controliface.S3ControlAPI, s3Client s3iface.S3API, c catalog.Cataloger, expiryResultsReader fileutil.RewindableReader, params *ExpireOnS3Params) chan error {
errCh := make(chan error, 100)
logger := logging.FromContext(ctx)
manifests, err := WriteExpiryManifestsFromReader(ctx, c, expiryResultsReader)
if err != nil {
logger.WithError(err).Error("write per-bucket manifests for expiry: %s (no expiry performed)", err)
return
}
type doneRec struct {
bucketName string
ok bool
}
tagCh := make(chan doneRec)
for bucketName, manifestReader := range manifests {
manifestUrl := params.ManifestUrlForBucket(bucketName)
bucketLogger := logger.WithFields(logging.Fields{"bucket": bucketName, "manifest_url": manifestUrl})
bucketLogger.Info("start expiry on S3")
go func(bucketName string, manifestReader io.ReadSeeker) {
params := BatchTagOnS3BucketParams{
AccountId: params.AccountId,
RoleArn: params.RoleArn,
BucketName: bucketName,
ManifestUrl: manifestUrl,
}
err := BatchTagOnS3Bucket(ctx, s3ControlClient, s3Client, manifestReader, &params)
if err != nil {
bucketLogger.WithError(err).Error("tag for expiry on S3")
tagCh <- doneRec{bucketName: bucketName, ok: false}
}
tagCh <- doneRec{bucketName: bucketName, ok: true}
}(bucketName, manifestReader)
}

taggedBuckets := make(map[string]struct{})
for i := 0; i < len(manifests); i++ {
done := <-tagCh
if done.ok {
taggedBuckets[done.bucketName] = struct{}{}
}
}

// Filter entries from successful buckets
err = expiryResultsReader.Rewind()
if err != nil {
logger.WithError(err).Error("[SEVERE] rewind expiry entries to expire on DB: entries may be lost")
// TODO(ariels): attempt to cancel jobs? (Tricky because those jobs are
// unlikely already to be available -- meanwhile failing to rewind a file is
// pretty much impossible.
return
}
decoder := json.NewDecoder(expiryResultsReader)

taggedRecordsByRepo := make(map[string] /*repositoryName*/ []*catalog.ExpireResult, 10000)
for recordNumber := 0; ; recordNumber++ {
recordLogger := logger.WithField("record_number", recordNumber)
record := catalog.ExpireResult{}
err = decoder.Decode(&record)
if errors.Is(err, io.EOF) {
break
}
errFields := FromLoggerContext(ctx)
go func() {
defer close(errCh)
manifests, err := WriteExpiryManifestsFromReader(ctx, c, expiryResultsReader)
if err != nil {
recordLogger.WithError(err).Warning("failed to read record; keep going, already lost this expiry")
continue
errCh <- MapError{errFields, fmt.Errorf("write per-bucket manifests for expiry: %s (no expiry performed)", err)}
return
}
recordLogger = recordLogger.WithField("record", record)
repository, err := c.GetRepository(ctx, record.Repository)
if err != nil {
recordLogger.WithError(err).Warning("failed to get repository URI; keep going, already lost this expiry")
continue
type doneRec struct {
bucketName string
err error
}
qualifiedKey, err := block.ResolveNamespace(repository.StorageNamespace, record.PhysicalAddress)
if err != nil {
recordLogger.WithError(err).
Warning("could not resolve namespace; keep going, already lost this expiry")
tagCh := make(chan doneRec)
for bucketName, manifestReader := range manifests {
manifestUrl := params.ManifestUrlForBucket(bucketName)
bucketLogger := logger.WithFields(logging.Fields{"bucket": bucketName, "manifest_url": manifestUrl})
bucketFields := errFields.WithFields(Fields{"bucket": bucketName, "manifest_url": manifestUrl})
bucketLogger.Info("start expiry on S3")
go func(bucketName string, manifestReader io.ReadSeeker) {
params := BatchTagOnS3BucketParams{
AccountId: params.AccountId,
RoleArn: params.RoleArn,
BucketName: bucketName,
ManifestUrl: manifestUrl,
}
err := BatchTagOnS3Bucket(ctx, s3ControlClient, s3Client, manifestReader, &params)
if err != nil {
tagCh <- doneRec{
bucketName: bucketName,
err: MapError{bucketFields, fmt.Errorf("tag for expiry on S3: %w", err)},
}
}
tagCh <- doneRec{bucketName: bucketName}
}(bucketName, manifestReader)
}
recordLogger = recordLogger.WithField("qualified_key", qualifiedKey)
bucketName := qualifiedKey.StorageNamespace
if _, ok := taggedBuckets[bucketName]; !ok {
continue

taggedBuckets := make(map[string]struct{})
for i := 0; i < len(manifests); i++ {
done := <-tagCh
if done.err != nil {
errCh <- done.err
continue
}
taggedBuckets[done.bucketName] = struct{}{}
}
taggedRecordsByRepo[record.Repository] = append(taggedRecordsByRepo[record.Repository], &record)
}
for repositoryName, records := range taggedRecordsByRepo {
repositoryLogger := logger.WithFields(logging.Fields{"repository": repositoryName, "num_records": len(records)})
err := c.MarkExpired(ctx, repositoryName, records)

// Filter entries from successful buckets
err = expiryResultsReader.Rewind()
if err != nil {
errCh <- MapError{errFields, fmt.Errorf("[SEVERE] rewind expiry entries to expire on DB: %w; entries may be lost", err)}
// TODO(ariels): attempt to cancel jobs? (Tricky because those jobs are
// unlikely already to be available.)
repositoryLogger.WithError(err).Error("[SEVERE] failed to mark objects expired in catalog; S3 WILL expire them soon")
} else {
repositoryLogger.Info("marked objects expired in catalog")
// unlikely already to be available -- meanwhile failing to rewind a file is
// pretty much impossible.
return
}
}
decoder := json.NewDecoder(expiryResultsReader)

taggedRecordsByRepo := make(map[string] /*repositoryName*/ []*catalog.ExpireResult, 10000)
for recordNumber := 0; ; recordNumber++ {
recordFields := errFields.WithField("record_number", recordNumber)
record := catalog.ExpireResult{}
err = decoder.Decode(&record)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
errCh <- MapError{recordFields, fmt.Errorf("failed to read record: %w; keep going, already lost this expiry", err)}
continue
}
recordFields = recordFields.WithField("record", record)
repository, err := c.GetRepository(ctx, record.Repository)
if err != nil {
errCh <- MapError{recordFields, fmt.Errorf("failed to get repository URI: %s; keep going, already lost this expiry", err)}
continue
}
qualifiedKey, err := block.ResolveNamespace(repository.StorageNamespace, record.PhysicalAddress)
if err != nil {
errCh <- MapError{recordFields, fmt.Errorf("could not resolve namespace: %w; keep going, already lost this expiry", err)}
continue
}
bucketName := qualifiedKey.StorageNamespace
if _, ok := taggedBuckets[bucketName]; !ok {
continue
}
taggedRecordsByRepo[record.Repository] = append(taggedRecordsByRepo[record.Repository], &record)
}
for repositoryName, records := range taggedRecordsByRepo {
repositoryLogger := logger.WithFields(logging.Fields{"repository": repositoryName, "num_records": len(records)})
repositoryFields := errFields.WithFields(Fields{"repository": repositoryName, "num_records": len(records)})
err := c.MarkExpired(ctx, repositoryName, records)
if err != nil {
// TODO(ariels): attempt to cancel jobs? (Tricky because those jobs are
// unlikely already to be available.)
errCh <- MapError{repositoryFields, fmt.Errorf("[SEVERE] mark objects expired in catalog: %w; but S3 WILL expire them soon", err)}
} else {
repositoryLogger.Info("marked objects expired in catalog")
}
}
}()
return errCh
}
83 changes: 83 additions & 0 deletions retention/map_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package retention

import (
"context"
"fmt"
"sort"
"strings"

"github.com/treeverse/lakefs/logging"
)

// Fields is a string-keyed map with a nice printed representation.
type Fields map[string]interface{}

func (f Fields) String() string {
keys := make([]string, 0, len(f))
for k := range f {
keys = append(keys, k)
}
sort.Strings(keys)

var b strings.Builder
sep := ""
for _, k := range keys {
b.WriteString(fmt.Sprintf("%s%s=%v", sep, k, f[k]))
if sep == "" {
sep = ", "
}
}
return b.String()
}

func copyFields(f Fields) Fields {
ret := Fields{}
for k, v := range f {
ret[k] = v
}
return ret
}

// WithField augments fields with another field.
func (f Fields) WithField(key string, value interface{}) Fields {
ret := copyFields(f)
ret[key] = value
return ret
}

// WithFields merges two Fields.
func (f Fields) WithFields(g Fields) Fields {
ret := copyFields(f)
for k, v := range g {
ret[k] = v
}
return ret
}

// FromLoggerContext returns Fields using logging keys from ctx. This is not stealing: logging
// exports the field key.
func FromLoggerContext(ctx context.Context) Fields {
ret := Fields{}
loggerFields := ctx.Value(logging.LogFieldsContextKey)
if loggerFields != nil {
for k, v := range loggerFields.(logging.Fields) {
ret[k] = v
}
}
return ret
}

// MapError wraps an error and adds multiple keyed additional Fields of string-keyed
// information.
type MapError struct {
Fields Fields
WrappedError error
}

func (m MapError) Unwrap() error {
return m.WrappedError
}

func (m MapError) Error() string {
return fmt.Sprintf("%+v %s", m.Fields, m.WrappedError)
}
62 changes: 62 additions & 0 deletions retention/map_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package retention_test

import (
"errors"
"fmt"
"testing"

"github.com/treeverse/lakefs/retention"
)

func TestFields_WithField(t *testing.T) {
fields := retention.Fields{}.WithField("a", 1)
if fields["a"].(int) != 1 {
t.Errorf("expected to set field \"a\" to 1, got %v", fields)
}
moreFields := fields.WithField("b", "two")
if moreFields["b"].(string) != "two" {
t.Errorf("expected to set field \"b\" to \"two\", got %v", moreFields)
}
if moreFields["a"].(int) != 1 {
t.Errorf("expected to keep field \"a\" at 1 after WithFields(\"b\", ...), got %v", moreFields)
}
if _, ok := fields["b"]; ok {
t.Errorf("expected WithFields(\"b\", ...) not to change original fields, got %v", fields)
}
}

func TestField_WithFields(t *testing.T) {
fieldsA := retention.Fields{}.WithField("a", 1)
fieldsB := retention.Fields{}.WithField("b", "two")
fields := fieldsA.WithFields(fieldsB)

if _, ok := fields["a"]; !ok {
t.Errorf("expected field \"a\" on merged fields, got %v", fields)
}
if _, ok := fields["b"]; !ok {
t.Errorf("expected field \"b\" on merged fields, got %v", fields)
}
if _, ok := fieldsA["b"]; ok {
t.Errorf("expected WithFields(...) not to change original fields, got %v", fieldsA)
}
if _, ok := fieldsB["a"]; ok {
t.Errorf("expected WithFields(...) not to change argument fields, got %v", fieldsB)
}
}

func TestField_String(t *testing.T) {
fields := retention.Fields{}.WithField("foo", "xyzzy").WithField("bar", 22)
s := fields.String()
if s != "bar=22, foo=xyzzy" {
t.Errorf("unexpected string representation of %v: %s", fields, s)
}
}

var wErr error = fmt.Errorf("error for testing")

func TestMapError(t *testing.T) {
err := retention.MapError{Fields: retention.Fields{"a": 1, "b": 2}, WrappedError: wErr}
if !errors.Is(err, wErr) {
t.Errorf("error %s failed to wrap its base %s", err, wErr)
}
}
Loading

0 comments on commit 3ac0877

Please sign in to comment.