Skip to content

Commit

Permalink
Various S3 Fixes (#10)
Browse files Browse the repository at this point in the history
Various S3 fixes around marker file handling:
- Deletion and overwrite now also triggers a marker update
- Require the protocol in the option's URL
  • Loading branch information
ahouene authored Nov 11, 2022
1 parent a779f8f commit f135622
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 70 deletions.
30 changes: 30 additions & 0 deletions backends/s3/marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package s3

import (
"context"
"fmt"
"time"
)

// setMarker puts name and etag into the object identified by
// UpdateMarkerFilename.
// An empty etag string means that the object identified by name was deleted.
//
// In case the UseUpdateMarker option is false, this function doesn't do
// anything and returns no error.
func (b *Backend) setMarker(ctx context.Context, name, etag string, isDel bool) error {
if !b.opt.UseUpdateMarker {
return nil
}
nanos := time.Now().UnixNano()
s := fmt.Sprintf("%s:%s:%d:%v", name, etag, nanos, isDel)
_, err := b.doStore(ctx, UpdateMarkerFilename, []byte(s))
if err != nil {
return err
}
b.mu.Lock()
defer b.mu.Unlock()
b.lastList = nil
b.lastMarker = s
return nil
}
107 changes: 60 additions & 47 deletions backends/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
Expand All @@ -14,8 +15,8 @@ import (
"time"

"github.com/PowerDNS/go-tlsconfig"
"github.com/minio/minio-go/v7"
"github.com/go-logr/logr"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"

"github.com/PowerDNS/simpleblob"
Expand All @@ -24,7 +25,7 @@ import (
const (
// DefaultEndpointURL is the default S3 endpoint to use if none is set.
// Here, no custom endpoint assumes AWS endpoint.
DefaultEndpointURL = "s3.amazonaws.com"
DefaultEndpointURL = "https://s3.amazonaws.com"
// DefaultRegion is the default S3 region to use, if none is configured
DefaultRegion = "us-east-1"
// DefaultInitTimeout is the time we allow for initialisation, like credential
Expand All @@ -51,7 +52,7 @@ type Options struct {
CreateBucket bool `yaml:"create_bucket"`

// EndpointURL can be set to something like "http://localhost:9000" when using Minio
// or "s3.amazonaws.com" for AWS S3.
// or "https://s3.amazonaws.com" for AWS S3.
EndpointURL string `yaml:"endpoint_url"`

// TLS allows customising the TLS configuration
Expand All @@ -65,7 +66,7 @@ type Options struct {

// UseUpdateMarker makes the backend write and read a file to determine if
// it can cache the last List command. The file contains the name of the
// last file stored.
// last file stored or deleted.
// This can reduce the number of LIST commands sent to S3, replacing them
// with GET commands that are about 12x cheaper.
// If enabled, it MUST be enabled on all instances!
Expand Down Expand Up @@ -113,36 +114,36 @@ func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList,
return b.doList(ctx, prefix)
}

// Request and cache full list, and use marker file to invalidate the cache
now := time.Now()
data, err := b.Load(ctx, UpdateMarkerFilename)
if err != nil && !os.IsNotExist(err) {
return nil, err
m, err := b.Load(ctx, UpdateMarkerFilename)
exists := !errors.Is(err, os.ErrNotExist)
if err != nil && exists {
return nil, err
}
current := string(data)
upstreamMarker := string(m)

b.mu.Lock()
lastMarker := b.lastMarker
age := now.Sub(b.lastTime)
mustUpdate := b.lastList == nil ||
upstreamMarker != b.lastMarker ||
time.Since(b.lastTime) >= b.opt.UpdateMarkerForceListInterval ||
!exists
blobs := b.lastList
b.mu.Unlock()

var blobs simpleblob.BlobList
if current != lastMarker || age >= b.opt.UpdateMarkerForceListInterval {
// Update cache
blobs, err = b.doList(ctx, "") // all, no prefix
if err != nil {
return nil, err
}
if !mustUpdate {
return blobs.WithPrefix(prefix), nil
}

b.mu.Lock()
b.lastMarker = current
b.lastList = blobs
b.mu.Unlock()
} else {
b.mu.Lock()
blobs = b.lastList
b.mu.Unlock()
blobs, err = b.doList(ctx, "") // We want to cache all, so no prefix
if err != nil {
return nil, err
}

b.mu.Lock()
b.lastMarker = upstreamMarker
b.lastList = blobs
b.lastTime = time.Now()
b.mu.Unlock()

return blobs.WithPrefix(prefix), nil
}

Expand All @@ -169,57 +170,64 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
return blobs, nil
}

// Load retrieves the content of the object identified by name from S3 Bucket
// configured in b.
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
metricCalls.WithLabelValues("load").Inc()
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()

obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{})
if err != nil {
if err = handleErrorResponse(err); err != nil {
return nil, err
}
if err = convertMinioError(err); err != nil {
return nil, err
} else if obj == nil {
return nil, os.ErrNotExist
}

p, err := io.ReadAll(obj)
if err = handleErrorResponse(err); err != nil {
if err = convertMinioError(err); err != nil {
return nil, err
}
return p, nil
}

// Store sets the content of the object identified by name to the content
// of data, in the S3 Bucket configured in b.
func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
if err := b.doStore(ctx, name, data); err != nil {
info, err := b.doStore(ctx, name, data)
if err != nil {
return err
}
if b.opt.UseUpdateMarker {
if err := b.doStore(ctx, UpdateMarkerFilename, []byte(name)); err != nil {
return err
}
}
return nil
return b.setMarker(ctx, name, info.ETag, false)
}

func (b *Backend) doStore(ctx context.Context, name string, data []byte) error {
func (b *Backend) doStore(ctx context.Context, name string, data []byte) (minio.UploadInfo, error) {
metricCalls.WithLabelValues("store").Inc()
metricLastCallTimestamp.WithLabelValues("store").SetToCurrentTime()

_, err := b.client.PutObject(ctx, b.opt.Bucket, name, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
info, err := b.client.PutObject(ctx, b.opt.Bucket, name, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
NumThreads: 3,
})
if err != nil {
metricCallErrors.WithLabelValues("store").Inc()
}
return err
return info, err
}

// Delete removes the object identified by name from the S3 Bucket
// configured in b.
func (b *Backend) Delete(ctx context.Context, name string) error {
if err := b.doDelete(ctx, name); err != nil {
return err
}
return b.setMarker(ctx, name, "", true)
}

func (b *Backend) doDelete(ctx context.Context, name string) error {
metricCalls.WithLabelValues("delete").Inc()
metricLastCallTimestamp.WithLabelValues("delete").SetToCurrentTime()

err := b.client.RemoveObject(ctx, b.opt.Bucket, name, minio.RemoveObjectOptions{})
if err = handleErrorResponse(err); err != nil {
if err = convertMinioError(err); err != nil {
metricCallErrors.WithLabelValues("delete").Inc()
}
return err
Expand Down Expand Up @@ -288,8 +296,10 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
// Ok, no SSL
case "https":
useSSL = true
case "":
return nil, fmt.Errorf("no scheme provided for endpoint URL '%s', use http or https.", opt.EndpointURL)
default:
return nil, fmt.Errorf("unsupported scheme for S3: '%s'", u.Scheme)
return nil, fmt.Errorf("unsupported scheme for S3: '%s', use http or https.", u.Scheme)
}

cfg := &minio.Options{
Expand Down Expand Up @@ -320,7 +330,7 @@ func New(ctx context.Context, opt Options) (*Backend, error) {

err := client.MakeBucket(ctx, opt.Bucket, minio.MakeBucketOptions{Region: opt.Region})
if err != nil {
if err := handleErrorResponse(err); err != nil {
if err := convertMinioError(err); err != nil {
return nil, err
}
}
Expand All @@ -336,11 +346,14 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
return b, nil
}

// handleErrorResponse takes an error, possibly a minio.ErrorResponse
// convertMinioError takes an error, possibly a minio.ErrorResponse
// and turns it into a well known error when possible.
// If error is not well known, it is returned as is.
// If error is considered to be ignorable, nil is returned.
func handleErrorResponse(err error) error {
func convertMinioError(err error) error {
if err == nil {
return nil
}
errResp := minio.ToErrorResponse(err)
if errResp.StatusCode == 404 {
return os.ErrNotExist
Expand Down
18 changes: 12 additions & 6 deletions backends/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) {
require.NoError(t, err)

cleanup := func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

blobs, err := b.doList(ctx, "")
if err != nil {
t.Logf("Blobs list error: %s", err)
Expand All @@ -74,24 +77,27 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) {

func TestBackend(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
defer cancel()

b := getBackend(ctx, t)
tester.DoBackendTests(t, b)
assert.Equal(t, "", b.lastMarker)
assert.Len(t, b.lastMarker, 0)
}

func TestBackend_marker(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
defer cancel()

b := getBackend(ctx, t)
b.opt.UseUpdateMarker = true

tester.DoBackendTests(t, b)
assert.Equal(t, "bar-1", b.lastMarker)
assert.Regexp(t, "^foo-1:[A-Za-z0-9]*:[0-9]+:true$", b.lastMarker)
// ^ reflects last write operation of tester.DoBackendTests
// i.e. deleting "foo-1"

data, err := b.Load(ctx, UpdateMarkerFilename)
// Marker file should have been written accordingly
markerFileContent, err := b.Load(ctx, UpdateMarkerFilename)
assert.NoError(t, err)
assert.Equal(t, "bar-1", string(data))
assert.EqualValues(t, b.lastMarker, markerFileContent)
}
50 changes: 33 additions & 17 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,49 @@
set -e

# Stop all child processes on exit
trap "pkill -P $$" EXIT
trap cleanup EXIT

cleanup() {
pkill -P $$
if [ -n "$tmpdir" ] && [ -d "$tmpdir" ]; then
rm -r "$tmpdir"
fi
}

export GOBIN="$PWD/bin"
export PATH="$GOBIN:$PATH"
tmpdir=

mkdir -p "$GOBIN"

if [ -z "$SIMPLEBLOB_TEST_S3_CONFIG" ]; then
echo "* Using MinIO for S3 tests"
export SIMPLEBLOB_TEST_S3_CONFIG="$PWD/test-minio.json"

# Check for existing minio
minio=$(which minio || true)

# Fetch minio if not found
if [ -z "$minio" ]; then
#minioversion="github.com/minio/[email protected]"
#echo "+ go install $minioversion" > /dev/stderr
#go install "$minioversion"
if ! command -v minio >/dev/null; then
source <(go env)
curl -v -o "$GOBIN/minio" "https://dl.min.io/server/minio/release/$GOOS-$GOARCH/minio"
minio=./bin/minio
chmod 6755 "$minio"
dst="$GOBIN/minio"
curl -v -o "$dst" "https://dl.min.io/server/minio/release/$GOOS-$GOARCH/minio"
chmod u+x "$dst"
fi

# Start MinIO
echo "* Starting $minio on port 34730"
echo "* Starting minio at address 127.0.0.1:34730"
tmpdir=$(mktemp -d -t minio.XXXXXX)
"$minio" server --address 127.0.0.1:34730 --console-address 127.0.0.1:34731 --quiet "$tmpdir" &
sleep 3
minio server --address 127.0.0.1:34730 --console-address 127.0.0.1:34731 --quiet "$tmpdir" &
# Wait for minio server to be ready
i=0
while ! curl -s -I "127.0.0.1:34730/minio/health/ready" | grep '200 OK' >/dev/null; do
i=$((i+1))
if [ "$i" -ge 10 ]; then
# We have been waiting for server to start for 10 seconds
echo "Minio could not start properly"
curl -s -I "127.0.0.1:34730/minio/health/ready"
exit 1
fi
sleep 1
done
fi

echo "* SIMPLEBLOB_TEST_S3_CONFIG=$SIMPLEBLOB_TEST_S3_CONFIG"
Expand All @@ -39,6 +55,6 @@ set -ex
go test -count=1 "$@" ./...

# Configure linters in .golangci.yml
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.50.1
./bin/golangci-lint run

expected_version=v1.50.1
go install github.com/golangci/golangci-lint/cmd/golangci-lint@"$expected_version"
golangci-lint run
9 changes: 9 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,13 @@ func DoBackendTests(t *testing.T, b simpleblob.Interface) {
// Delete non-existing
err = b.Delete(ctx, "foo-1")
assert.NoError(t, err)

// Should not exist anymore
_, err = b.Load(ctx, "foo-1")
assert.ErrorIs(t, err, os.ErrNotExist)

// Should not appear in list anymore
ls, err = b.List(ctx, "")
assert.NoError(t, err)
assert.NotContains(t, ls.Names(), "foo-1")
}

0 comments on commit f135622

Please sign in to comment.