diff --git a/docker/base/Dockerfile b/docker/base/Dockerfile index ecab9ab6c42..bc81925d79b 100644 --- a/docker/base/Dockerfile +++ b/docker/base/Dockerfile @@ -25,6 +25,9 @@ FROM vitess/bootstrap:mysql57 # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/base/Dockerfile.mariadb b/docker/base/Dockerfile.mariadb index a3f9771334f..42e09d1a9d2 100644 --- a/docker/base/Dockerfile.mariadb +++ b/docker/base/Dockerfile.mariadb @@ -1,5 +1,11 @@ FROM vitess/bootstrap:mariadb +# Allows some docker builds to disable CGO +ARG CGO_ENABLED=0 + +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/base/Dockerfile.mariadb103 b/docker/base/Dockerfile.mariadb103 index 75811dc68ba..cc86d5aede5 100644 --- a/docker/base/Dockerfile.mariadb103 +++ b/docker/base/Dockerfile.mariadb103 @@ -1,5 +1,11 @@ FROM vitess/bootstrap:mariadb103 +# Allows some docker builds to disable CGO +ARG CGO_ENABLED=0 + +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/base/Dockerfile.mysql56 b/docker/base/Dockerfile.mysql56 index 5d1f55079fa..efdbade8ced 100644 --- a/docker/base/Dockerfile.mysql56 +++ b/docker/base/Dockerfile.mysql56 @@ -1,5 +1,11 @@ FROM vitess/bootstrap:mysql56 +# Allows some docker builds to disable CGO +ARG CGO_ENABLED=0 + +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/base/Dockerfile.mysql80 b/docker/base/Dockerfile.mysql80 index 5b2fc5c91ae..4a798700060 100644 --- a/docker/base/Dockerfile.mysql80 +++ b/docker/base/Dockerfile.mysql80 @@ -1,5 +1,11 @@ FROM vitess/bootstrap:mysql80 +# Allows some docker builds to disable CGO +ARG CGO_ENABLED=0 + +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/base/Dockerfile.percona b/docker/base/Dockerfile.percona index 93a218ff059..5b488680126 100644 --- a/docker/base/Dockerfile.percona +++ b/docker/base/Dockerfile.percona @@ -1,5 +1,11 @@ FROM vitess/bootstrap:percona +# Allows some docker builds to disable CGO +ARG CGO_ENABLED=0 + +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/base/Dockerfile.percona57 b/docker/base/Dockerfile.percona57 index 4f396c9c86b..4a3aa6b0c7c 100644 --- a/docker/base/Dockerfile.percona57 +++ b/docker/base/Dockerfile.percona57 @@ -1,5 +1,11 @@ FROM vitess/bootstrap:percona57 +# Allows some docker builds to disable CGO +ARG CGO_ENABLED=0 + +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/base/Dockerfile.percona80 b/docker/base/Dockerfile.percona80 index 83672111166..f5997a16e1a 100644 --- a/docker/base/Dockerfile.percona80 +++ b/docker/base/Dockerfile.percona80 @@ -1,5 +1,11 @@ FROM vitess/bootstrap:percona80 +# Allows some docker builds to disable CGO +ARG CGO_ENABLED=0 + +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree USER root COPY . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.alpine b/docker/lite/Dockerfile.alpine index 4082bb04b67..84b19c3a9bf 100644 --- a/docker/lite/Dockerfile.alpine +++ b/docker/lite/Dockerfile.alpine @@ -22,6 +22,9 @@ FROM vitess/bootstrap:mariadb103 AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.mariadb b/docker/lite/Dockerfile.mariadb index ce9c8cf4b4f..c99858aade5 100644 --- a/docker/lite/Dockerfile.mariadb +++ b/docker/lite/Dockerfile.mariadb @@ -22,6 +22,9 @@ FROM vitess/bootstrap:mariadb AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.mariadb103 b/docker/lite/Dockerfile.mariadb103 index f0833f81f51..2127b3423c9 100644 --- a/docker/lite/Dockerfile.mariadb103 +++ b/docker/lite/Dockerfile.mariadb103 @@ -22,6 +22,9 @@ FROM vitess/bootstrap:mariadb103 AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.mysql56 b/docker/lite/Dockerfile.mysql56 index 7b78af0aa89..306c2e2ac1c 100644 --- a/docker/lite/Dockerfile.mysql56 +++ b/docker/lite/Dockerfile.mysql56 @@ -22,6 +22,9 @@ FROM vitess/bootstrap:mysql56 AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.mysql57 b/docker/lite/Dockerfile.mysql57 index 25535464af8..a024cbcb6d5 100644 --- a/docker/lite/Dockerfile.mysql57 +++ b/docker/lite/Dockerfile.mysql57 @@ -22,6 +22,9 @@ FROM vitess/bootstrap:mysql57 AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.mysql80 b/docker/lite/Dockerfile.mysql80 index 4d0ad6d6ec8..1d3a48d3220 100644 --- a/docker/lite/Dockerfile.mysql80 +++ b/docker/lite/Dockerfile.mysql80 @@ -22,6 +22,9 @@ FROM vitess/bootstrap:mysql80 AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.percona b/docker/lite/Dockerfile.percona index a3e77350672..ba0de684220 100644 --- a/docker/lite/Dockerfile.percona +++ b/docker/lite/Dockerfile.percona @@ -22,6 +22,9 @@ FROM vitess/bootstrap:percona AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.percona57 b/docker/lite/Dockerfile.percona57 index 3ac6d264f8b..f4abff9a92a 100644 --- a/docker/lite/Dockerfile.percona57 +++ b/docker/lite/Dockerfile.percona57 @@ -22,6 +22,9 @@ FROM vitess/bootstrap:percona57 AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/docker/lite/Dockerfile.percona80 b/docker/lite/Dockerfile.percona80 index 71f258d7e03..add8229d3a7 100644 --- a/docker/lite/Dockerfile.percona80 +++ b/docker/lite/Dockerfile.percona80 @@ -22,6 +22,9 @@ FROM vitess/bootstrap:percona80 AS builder # Allows some docker builds to disable CGO ARG CGO_ENABLED=0 +# Allows docker builds to set the BUILD_NUMBER +ARG BUILD_NUMBER + # Re-copy sources from working tree. COPY --chown=vitess:vitess . /vt/src/vitess.io/vitess diff --git a/go/sqltypes/result.go b/go/sqltypes/result.go index b12378b2ebf..51cde66f570 100644 --- a/go/sqltypes/result.go +++ b/go/sqltypes/result.go @@ -159,13 +159,14 @@ func ResultsEqual(r1, r2 []Result) bool { // Every place this function is called, a comment is needed that explains // why it's justified. func MakeRowTrusted(fields []*querypb.Field, row *querypb.Row) []Value { - sqlRow := make([]Value, len(row.Lengths)) + sqlRow := make([]Value, len(fields)) var offset int64 - for i, length := range row.Lengths { + for i, fld := range fields { + length := row.Lengths[i] if length < 0 { continue } - sqlRow[i] = MakeTrusted(fields[i].Type, row.Values[offset:offset+length]) + sqlRow[i] = MakeTrusted(fld.Type, row.Values[offset:offset+length]) offset += length } return sqlRow diff --git a/go/sqltypes/result_test.go b/go/sqltypes/result_test.go index bf2d9fd87fe..43e72a089fb 100644 --- a/go/sqltypes/result_test.go +++ b/go/sqltypes/result_test.go @@ -23,6 +23,61 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" ) +func TestMakeRowTrusted(t *testing.T) { + fields := MakeTestFields( + "some_int|some_text|another_int", + "int8|varchar|int8", + ) + + values := []byte{} + hw := []byte("hello, world") + values = append(values, hw...) + values = append(values, byte(42)) + + row := &querypb.Row{ + Lengths: []int64{-1, int64(len(hw)), 1}, + Values: values, + } + + want := []Value{ + MakeTrusted(querypb.Type_NULL_TYPE, nil), + MakeTrusted(querypb.Type_VARCHAR, []byte("hello, world")), + MakeTrusted(querypb.Type_INT8, []byte{byte(42)}), + } + + result := MakeRowTrusted(fields, row) + if !reflect.DeepEqual(result, want) { + t.Errorf("MakeRowTrusted:\ngot: %#v\nwant: %#v", result, want) + } +} + +func TestMakeRowTrustedDoesNotPanicOnNewColumns(t *testing.T) { + fields := MakeTestFields( + "some_int|some_text", + "int8|varchar", + ) + + values := []byte{byte(123)} + hw := []byte("hello, world") + values = append(values, hw...) + values = append(values, byte(42)) + + row := &querypb.Row{ + Lengths: []int64{1, int64(len(hw)), 1}, + Values: values, + } + + want := []Value{ + MakeTrusted(querypb.Type_INT8, []byte{byte(123)}), + MakeTrusted(querypb.Type_VARCHAR, []byte("hello, world")), + } + + result := MakeRowTrusted(fields, row) + if !reflect.DeepEqual(result, want) { + t.Errorf("MakeRowTrusted:\ngot: %#v\nwant: %#v", result, want) + } +} + func TestRepair(t *testing.T) { fields := []*querypb.Field{{ Type: Int64, diff --git a/go/vt/mysqlctl/azblobbackupstorage/azblob.go b/go/vt/mysqlctl/azblobbackupstorage/azblob.go index a9346506e82..19a8ab331a0 100644 --- a/go/vt/mysqlctl/azblobbackupstorage/azblob.go +++ b/go/vt/mysqlctl/azblobbackupstorage/azblob.go @@ -127,6 +127,21 @@ func (bh *AZBlobBackupHandle) Name() string { return bh.name } +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *AZBlobBackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *AZBlobBackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *AZBlobBackupHandle) Error() error { + return bh.errors.Error() +} + // AddFile implements BackupHandle. func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { @@ -156,7 +171,7 @@ func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, file }) if err != nil { reader.CloseWithError(err) - bh.errors.RecordError(err) + bh.RecordError(err) } }() @@ -169,7 +184,7 @@ func (bh *AZBlobBackupHandle) EndBackup(ctx context.Context) error { return fmt.Errorf("EndBackup cannot be called on read-only backup") } bh.waitGroup.Wait() - return bh.errors.Error() + return bh.Error() } // AbortBackup implements BackupHandle. diff --git a/go/vt/mysqlctl/backupstorage/interface.go b/go/vt/mysqlctl/backupstorage/interface.go index e4c2e6bc18d..78720a12e08 100644 --- a/go/vt/mysqlctl/backupstorage/interface.go +++ b/go/vt/mysqlctl/backupstorage/interface.go @@ -24,6 +24,7 @@ import ( "io" "golang.org/x/net/context" + "vitess.io/vitess/go/vt/concurrency" ) var ( @@ -74,6 +75,10 @@ type BackupHandle interface { // The context is valid for the duration of the reads, until the // ReadCloser is closed. ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) + + // concurrency.ErrorRecorder is embedded here to coordinate reporting and + // handling of errors among all the components involved in taking a backup. + concurrency.ErrorRecorder } // BackupStorage is the interface to the storage system diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index f49f8fdf780..25c8b87a768 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -276,7 +276,6 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar // Backup with the provided concurrency. sema := sync2.NewSemaphore(params.Concurrency, 0) - rec := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} for i := range fes { wg.Add(1) @@ -287,19 +286,28 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar // encountered an error. sema.Acquire() defer sema.Release() - if rec.HasErrors() { + if bh.HasErrors() { return } // Backup the individual file. name := fmt.Sprintf("%v", i) - rec.RecordError(be.backupFile(ctx, params, bh, &fes[i], name)) + bh.RecordError(be.backupFile(ctx, params, bh, &fes[i], name)) }(i) } wg.Wait() - if rec.HasErrors() { - return rec.Error() + + // BackupHandle supports the ErrorRecorder interface for tracking errors + // across any goroutines that fan out to take the backup. This means that we + // don't need a local error recorder and can put everything through the bh. + // + // This handles the scenario where bh.AddFile() encounters an error asynchronously, + // which ordinarily would be lost in the context of `be.backupFile`, i.e. if an + // error were encountered + // [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142). + if bh.HasErrors() { + return bh.Error() } // open the MANIFEST diff --git a/go/vt/mysqlctl/cephbackupstorage/ceph.go b/go/vt/mysqlctl/cephbackupstorage/ceph.go index 80c37ceb828..085ecd94828 100644 --- a/go/vt/mysqlctl/cephbackupstorage/ceph.go +++ b/go/vt/mysqlctl/cephbackupstorage/ceph.go @@ -62,6 +62,21 @@ type CephBackupHandle struct { waitGroup sync.WaitGroup } +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *CephBackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *CephBackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *CephBackupHandle) Error() error { + return bh.errors.Error() +} + // Directory implements BackupHandle. func (bh *CephBackupHandle) Directory() string { return bh.dir @@ -94,7 +109,7 @@ func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesi // Signal the writer that an error occurred, in case it's not done writing yet. reader.CloseWithError(err) // In case the error happened after the writer finished, we need to remember it. - bh.errors.RecordError(err) + bh.RecordError(err) } }() // Give our caller the write end of the pipe. @@ -108,7 +123,7 @@ func (bh *CephBackupHandle) EndBackup(ctx context.Context) error { } bh.waitGroup.Wait() // Return the saved PutObject() errors, if any. - return bh.errors.Error() + return bh.Error() } // AbortBackup implements BackupHandle. diff --git a/go/vt/mysqlctl/filebackupstorage/file.go b/go/vt/mysqlctl/filebackupstorage/file.go index c99a0ee6b0a..be5810c1a94 100644 --- a/go/vt/mysqlctl/filebackupstorage/file.go +++ b/go/vt/mysqlctl/filebackupstorage/file.go @@ -28,6 +28,7 @@ import ( "golang.org/x/net/context" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" ) @@ -43,6 +44,22 @@ type FileBackupHandle struct { dir string name string readOnly bool + errors concurrency.AllErrorRecorder +} + +// RecordError is part of the concurrency.ErrorRecorder interface. +func (fbh *FileBackupHandle) RecordError(err error) { + fbh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (fbh *FileBackupHandle) HasErrors() bool { + return fbh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (fbh *FileBackupHandle) Error() error { + return fbh.errors.Error() } // Directory is part of the BackupHandle interface diff --git a/go/vt/mysqlctl/gcsbackupstorage/gcs.go b/go/vt/mysqlctl/gcsbackupstorage/gcs.go index 5473be3217a..7ca9f4796d6 100644 --- a/go/vt/mysqlctl/gcsbackupstorage/gcs.go +++ b/go/vt/mysqlctl/gcsbackupstorage/gcs.go @@ -33,6 +33,7 @@ import ( "google.golang.org/api/option" "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" ) @@ -51,6 +52,22 @@ type GCSBackupHandle struct { dir string name string readOnly bool + errors concurrency.AllErrorRecorder +} + +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *GCSBackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *GCSBackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *GCSBackupHandle) Error() error { + return bh.errors.Error() } // Directory implements BackupHandle. diff --git a/go/vt/mysqlctl/s3backupstorage/retryer.go b/go/vt/mysqlctl/s3backupstorage/retryer.go index d3aab346259..052b1ef26d1 100644 --- a/go/vt/mysqlctl/s3backupstorage/retryer.go +++ b/go/vt/mysqlctl/s3backupstorage/retryer.go @@ -35,7 +35,9 @@ func (retryer *ClosedConnectionRetryer) ShouldRetry(r *request.Request) bool { if r.Error != nil { if awsErr, ok := r.Error.(awserr.Error); ok { - return strings.Contains(awsErr.OrigErr().Error(), "use of closed network connection") + if strings.Contains(awsErr.Error(), "use of closed network connection") { + return true + } } } diff --git a/go/vt/mysqlctl/s3backupstorage/retryer_test.go b/go/vt/mysqlctl/s3backupstorage/retryer_test.go new file mode 100644 index 00000000000..35f83221170 --- /dev/null +++ b/go/vt/mysqlctl/s3backupstorage/retryer_test.go @@ -0,0 +1,79 @@ +package s3backupstorage + +import ( + "errors" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/stretchr/testify/assert" +) + +type testRetryer struct{} + +func (r *testRetryer) MaxRetries() int { return 5 } +func (r *testRetryer) RetryRules(req *request.Request) time.Duration { return time.Second } +func (r *testRetryer) ShouldRetry(req *request.Request) bool { return false } + +func TestShouldRetry(t *testing.T) { + tests := []struct { + name string + r *request.Request + expected bool + }{ + + { + name: "non retryable request", + r: &request.Request{ + Retryable: aws.Bool(false), + }, + expected: false, + }, + { + name: "retryable request", + r: &request.Request{ + Retryable: aws.Bool(true), + }, + expected: true, + }, + { + name: "non aws error", + r: &request.Request{ + Retryable: nil, + Error: errors.New("some error"), + }, + expected: false, + }, + { + name: "closed connection error", + r: &request.Request{ + Retryable: nil, + Error: awserr.New("5xx", "use of closed network connection", nil), + }, + expected: true, + }, + { + name: "closed connection error (non nil origError)", + r: &request.Request{ + Retryable: nil, + Error: awserr.New("5xx", "use of closed network connection", errors.New("some error")), + }, + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + retryer := &ClosedConnectionRetryer{&testRetryer{}} + msg := "" + if test.r.Error != nil { + if awsErr, ok := test.r.Error.(awserr.Error); ok { + msg = awsErr.Error() + } + } + assert.Equal(t, test.expected, retryer.ShouldRetry(test.r), msg) + }) + } +} diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 55fe11f6942..e01c8c678a5 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -41,6 +41,7 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" "golang.org/x/net/context" @@ -85,7 +86,7 @@ var logNameMap logNameToLogLevel // S3BackupHandle implements the backupstorage.BackupHandle interface. type S3BackupHandle struct { - client *s3.S3 + client s3iface.S3API bs *S3BackupStorage dir string name string @@ -104,6 +105,21 @@ func (bh *S3BackupHandle) Name() string { return bh.name } +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *S3BackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *S3BackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *S3BackupHandle) Error() error { + return bh.errors.Error() +} + // AddFile is part of the backupstorage.BackupHandle interface. func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { @@ -143,7 +159,7 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize }) if err != nil { reader.CloseWithError(err) - bh.errors.RecordError(err) + bh.RecordError(err) } }() @@ -156,7 +172,7 @@ func (bh *S3BackupHandle) EndBackup(ctx context.Context) error { return fmt.Errorf("EndBackup cannot be called on read-only backup") } bh.waitGroup.Wait() - return bh.errors.Error() + return bh.Error() } // AbortBackup is part of the backupstorage.BackupHandle interface. diff --git a/go/vt/mysqlctl/s3backupstorage/s3_test.go b/go/vt/mysqlctl/s3backupstorage/s3_test.go new file mode 100644 index 00000000000..25d958934f7 --- /dev/null +++ b/go/vt/mysqlctl/s3backupstorage/s3_test.go @@ -0,0 +1,43 @@ +package s3backupstorage + +import ( + "errors" + "net/http" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type s3ErrorClient struct{ s3iface.S3API } + +func (s3errclient *s3ErrorClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) { + req := request.Request{ + HTTPRequest: &http.Request{}, // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13) + Error: errors.New("some error"), // this forces req.Send() (which is called by the uploader) to always return non-nil error + } + + return &req, &s3.PutObjectOutput{} +} + +func TestAddFileError(t *testing.T) { + bh := &S3BackupHandle{client: &s3ErrorClient{}, readOnly: false} + + wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000) + require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) + assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") + + n, err := wc.Write([]byte("here are some bytes")) + require.NoErrorf(t, err, "TestAddFile() could not write to uploader, got %d bytes written, err %s", n, err) + + err = wc.Close() + require.NoErrorf(t, err, "TestAddFile() could not close writer, got %s", err) + + bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error + + require.Equal(t, bh.HasErrors(), true, "AddFile() expected bh to record async error but did not") +} diff --git a/go/vt/servenv/exporter.go b/go/vt/servenv/exporter.go index 61a12b70ea6..969c12a4ebc 100644 --- a/go/vt/servenv/exporter.go +++ b/go/vt/servenv/exporter.go @@ -50,6 +50,11 @@ var ( // name causes that Exporter to be reused. exporters = make(map[string]*Exporter) + // unnamedExports contain variables that were exported using + // an unnamed exporter. If there is a name collision here, we + // just reuse the unnamed variable. + unnamedExports = make(map[string]expvar.Var) + // exportedMultiCountVars contains the merged stats vars created for the vars that support Counts. exportedMultiCountVars = make(map[string]*multiCountVars) @@ -183,7 +188,9 @@ func (e *Exporter) NewCountersFuncWithMultiLabels(name, help string, labels []st // If e.name is empty, it's a pass-through. // If name is empty, it's an unexported var. if e.name == "" || name == "" { - return stats.NewCountersFuncWithMultiLabels(name, help, labels, f) + v := stats.NewCountersFuncWithMultiLabels(name, help, labels, f) + addUnnamedExport(name, v) + return v } lvar := stats.NewCountersFuncWithMultiLabels("", help, labels, f) _ = e.createCountsTracker(name, help, labels, lvar, replaceOnDup, typeCounter) @@ -194,6 +201,13 @@ func (e *Exporter) createCountsTracker(name, help string, labels []string, lvar exporterMu.Lock() defer exporterMu.Unlock() + if c, ok := unnamedExports[name]; ok { + if typ == typeCounter { + return c.(multiCountVar) + } + return nil + } + if evar, ok := exportedMultiCountVars[name]; ok { evar.mu.Lock() defer evar.mu.Unlock() @@ -221,7 +235,9 @@ func (e *Exporter) createCountsTracker(name, help string, labels []string, lvar // NewGaugesFuncWithMultiLabels creates a name-spaced equivalent for stats.NewGaugesFuncWithMultiLabels. func (e *Exporter) NewGaugesFuncWithMultiLabels(name, help string, labels []string, f func() map[string]int64) *stats.GaugesFuncWithMultiLabels { if e.name == "" || name == "" { - return stats.NewGaugesFuncWithMultiLabels(name, help, labels, f) + v := stats.NewGaugesFuncWithMultiLabels(name, help, labels, f) + addUnnamedExport(name, v) + return v } lvar := stats.NewGaugesFuncWithMultiLabels("", help, labels, f) _ = e.createCountsTracker(name, help, labels, lvar, replaceOnDup, typeGauge) @@ -231,7 +247,9 @@ func (e *Exporter) NewGaugesFuncWithMultiLabels(name, help string, labels []stri // NewCounter creates a name-spaced equivalent for stats.NewCounter. func (e *Exporter) NewCounter(name string, help string) *stats.Counter { if e.name == "" || name == "" { - return stats.NewCounter(name, help) + v := stats.NewCounter(name, help) + addUnnamedExport(name, v) + return v } lvar := stats.NewCounter("", help) if exists := e.createCountTracker(name, help, lvar, reuseOnDup, typeCounter); exists != nil { @@ -244,6 +262,13 @@ func (e *Exporter) createCountTracker(name, help string, lvar singleCountVar, on exporterMu.Lock() defer exporterMu.Unlock() + if c, ok := unnamedExports[name]; ok { + if typ == typeCounter { + return c.(singleCountVar) + } + return nil + } + if evar, ok := exportedSingleCountVars[name]; ok { evar.mu.Lock() defer evar.mu.Unlock() @@ -271,7 +296,9 @@ func (e *Exporter) createCountTracker(name, help string, lvar singleCountVar, on // NewGauge creates a name-spaced equivalent for stats.NewGauge. func (e *Exporter) NewGauge(name string, help string) *stats.Gauge { if e.name == "" || name == "" { - return stats.NewGauge(name, help) + v := stats.NewGauge(name, help) + addUnnamedExport(name, v) + return v } lvar := stats.NewGauge("", help) if exists := e.createCountTracker(name, help, lvar, reuseOnDup, typeCounter); exists != nil { @@ -283,7 +310,9 @@ func (e *Exporter) NewGauge(name string, help string) *stats.Gauge { // NewCounterFunc creates a name-spaced equivalent for stats.NewCounterFunc. func (e *Exporter) NewCounterFunc(name string, help string, f func() int64) *stats.CounterFunc { if e.name == "" || name == "" { - return stats.NewCounterFunc(name, help, f) + v := stats.NewCounterFunc(name, help, f) + addUnnamedExport(name, v) + return v } lvar := stats.NewCounterFunc("", help, f) _ = e.createCountTracker(name, help, lvar, replaceOnDup, typeCounter) @@ -293,7 +322,9 @@ func (e *Exporter) NewCounterFunc(name string, help string, f func() int64) *sta // NewGaugeFunc creates a name-spaced equivalent for stats.NewGaugeFunc. func (e *Exporter) NewGaugeFunc(name string, help string, f func() int64) *stats.GaugeFunc { if e.name == "" || name == "" { - return stats.NewGaugeFunc(name, help, f) + v := stats.NewGaugeFunc(name, help, f) + addUnnamedExport(name, v) + return v } lvar := stats.NewGaugeFunc("", help, f) _ = e.createCountTracker(name, help, lvar, replaceOnDup, typeGauge) @@ -303,7 +334,9 @@ func (e *Exporter) NewGaugeFunc(name string, help string, f func() int64) *stats // NewCounterDurationFunc creates a name-spaced equivalent for stats.NewCounterDurationFunc. func (e *Exporter) NewCounterDurationFunc(name string, help string, f func() time.Duration) *stats.CounterDurationFunc { if e.name == "" || name == "" { - return stats.NewCounterDurationFunc(name, help, f) + v := stats.NewCounterDurationFunc(name, help, f) + addUnnamedExport(name, v) + return v } lvar := stats.NewCounterDurationFunc("", help, f) _ = e.createCountTracker(name, help, lvar, replaceOnDup, typeCounter) @@ -313,7 +346,9 @@ func (e *Exporter) NewCounterDurationFunc(name string, help string, f func() tim // NewGaugeDurationFunc creates a name-spaced equivalent for stats.NewGaugeDurationFunc. func (e *Exporter) NewGaugeDurationFunc(name string, help string, f func() time.Duration) *stats.GaugeDurationFunc { if e.name == "" || name == "" { - return stats.NewGaugeDurationFunc(name, help, f) + v := stats.NewGaugeDurationFunc(name, help, f) + addUnnamedExport(name, v) + return v } lvar := stats.NewGaugeDurationFunc("", help, f) _ = e.createCountTracker(name, help, lvar, replaceOnDup, typeGauge) @@ -324,7 +359,9 @@ func (e *Exporter) NewGaugeDurationFunc(name string, help string, f func() time. // Tags are ignored if the exporter is named. func (e *Exporter) NewCountersWithSingleLabel(name, help string, label string, tags ...string) *stats.CountersWithSingleLabel { if e.name == "" || name == "" { - return stats.NewCountersWithSingleLabel(name, help, label, tags...) + v := stats.NewCountersWithSingleLabel(name, help, label, tags...) + addUnnamedExport(name, v) + return v } lvar := stats.NewCountersWithSingleLabel("", help, label) if exists := e.createCountsTracker(name, help, []string{label}, lvar, reuseOnDup, typeCounter); exists != nil { @@ -337,7 +374,9 @@ func (e *Exporter) NewCountersWithSingleLabel(name, help string, label string, t // Tags are ignored if the exporter is named. func (e *Exporter) NewGaugesWithSingleLabel(name, help string, label string, tags ...string) *stats.GaugesWithSingleLabel { if e.name == "" || name == "" { - return stats.NewGaugesWithSingleLabel(name, help, label, tags...) + v := stats.NewGaugesWithSingleLabel(name, help, label, tags...) + addUnnamedExport(name, v) + return v } lvar := stats.NewGaugesWithSingleLabel("", help, label) @@ -350,7 +389,9 @@ func (e *Exporter) NewGaugesWithSingleLabel(name, help string, label string, tag // NewCountersWithMultiLabels creates a name-spaced equivalent for stats.NewCountersWithMultiLabels. func (e *Exporter) NewCountersWithMultiLabels(name, help string, labels []string) *stats.CountersWithMultiLabels { if e.name == "" || name == "" { - return stats.NewCountersWithMultiLabels(name, help, labels) + v := stats.NewCountersWithMultiLabels(name, help, labels) + addUnnamedExport(name, v) + return v } lvar := stats.NewCountersWithMultiLabels("", help, labels) @@ -363,7 +404,9 @@ func (e *Exporter) NewCountersWithMultiLabels(name, help string, labels []string // NewGaugesWithMultiLabels creates a name-spaced equivalent for stats.NewGaugesWithMultiLabels. func (e *Exporter) NewGaugesWithMultiLabels(name, help string, labels []string) *stats.GaugesWithMultiLabels { if e.name == "" || name == "" { - return stats.NewGaugesWithMultiLabels(name, help, labels) + v := stats.NewGaugesWithMultiLabels(name, help, labels) + addUnnamedExport(name, v) + return v } lvar := stats.NewGaugesWithMultiLabels("", help, labels) @@ -377,14 +420,22 @@ func (e *Exporter) NewGaugesWithMultiLabels(name, help string, labels []string) // The function currently just returns an unexported variable. func (e *Exporter) NewTimings(name string, help string, label string) *TimingsWrapper { if e.name == "" || name == "" { - return &TimingsWrapper{ + v := &TimingsWrapper{ timings: stats.NewMultiTimings(name, help, []string{label}), } + addUnnamedExport(name, v.timings) + return v } exporterMu.Lock() defer exporterMu.Unlock() + if v, ok := unnamedExports[name]; ok { + return &TimingsWrapper{ + timings: v.(*stats.MultiTimings), + } + } + if tv, ok := exportedTimingsVars[name]; ok { return &TimingsWrapper{ name: e.name, @@ -403,14 +454,22 @@ func (e *Exporter) NewTimings(name string, help string, label string) *TimingsWr // The function currently just returns an unexported variable. func (e *Exporter) NewMultiTimings(name string, help string, labels []string) *MultiTimingsWrapper { if e.name == "" || name == "" { - return &MultiTimingsWrapper{ + v := &MultiTimingsWrapper{ timings: stats.NewMultiTimings(name, help, labels), } + addUnnamedExport(name, v.timings) + return v } exporterMu.Lock() defer exporterMu.Unlock() + if v, ok := unnamedExports[name]; ok { + return &MultiTimingsWrapper{ + timings: v.(*stats.MultiTimings), + } + } + if tv, ok := exportedTimingsVars[name]; ok { return &MultiTimingsWrapper{ name: e.name, @@ -429,11 +488,18 @@ func (e *Exporter) NewMultiTimings(name string, help string, labels []string) *M // The function currently just returns an unexported variable. func (e *Exporter) NewRates(name string, singleCountVar multiCountVar, samples int, interval time.Duration) *stats.Rates { if e.name == "" || name == "" { - return stats.NewRates(name, singleCountVar, samples, interval) + v := stats.NewRates(name, singleCountVar, samples, interval) + addUnnamedExport(name, v) + return v } exporterMu.Lock() defer exporterMu.Unlock() + + if v, ok := unnamedExports[name]; ok { + return v.(*stats.Rates) + } + ov, ok := exportedOtherStatsVars[name] if !ok { ov = expvar.NewMap(name) @@ -452,7 +518,9 @@ func (e *Exporter) NewRates(name string, singleCountVar multiCountVar, samples i // The function currently just returns an unexported variable. func (e *Exporter) NewHistogram(name, help string, cutoffs []int64) *stats.Histogram { if e.name == "" || name == "" { - return stats.NewHistogram(name, help, cutoffs) + v := stats.NewHistogram(name, help, cutoffs) + addUnnamedExport(name, v) + return v } hist := stats.NewHistogram("", help, cutoffs) e.addToOtherVars(name, hist) @@ -463,6 +531,7 @@ func (e *Exporter) NewHistogram(name, help string, cutoffs []int64) *stats.Histo // The function just passes through if the Exporter name is empty. func (e *Exporter) Publish(name string, v expvar.Var) { if e.name == "" || name == "" { + addUnnamedExport(name, v) stats.Publish(name, v) return } @@ -473,6 +542,10 @@ func (e *Exporter) addToOtherVars(name string, v expvar.Var) { exporterMu.Lock() defer exporterMu.Unlock() + if _, ok := unnamedExports[name]; ok { + return + } + ov, ok := exportedOtherStatsVars[name] if !ok { ov = expvar.NewMap(name) @@ -620,6 +693,15 @@ func (hf *handleFunc) Get() func(w http.ResponseWriter, r *http.Request) { //----------------------------------------------------------------- +func addUnnamedExport(name string, v expvar.Var) { + if name == "" { + return + } + exporterMu.Lock() + unnamedExports[name] = v + exporterMu.Unlock() +} + func combineLabels(label string, labels []string) []string { return append(append(make([]string, 0, len(labels)+1), label), labels...) } diff --git a/go/vt/servenv/exporter_test.go b/go/vt/servenv/exporter_test.go index 123e7f3bab0..aa86c2679ae 100644 --- a/go/vt/servenv/exporter_test.go +++ b/go/vt/servenv/exporter_test.go @@ -108,6 +108,10 @@ func TestCountersFuncWithMultiLabels(t *testing.T) { ebd.NewCountersFuncWithMultiLabels("", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 2} }) ebd.NewCountersFuncWithMultiLabels("", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 3} }) + // Ensure reuse of global var is ignored. + ebd.NewCountersFuncWithMultiLabels("gcfwml", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 2} }) + assert.Equal(t, `{"a": 1}`, expvar.Get("gcfwml").String()) + ebd.NewCountersFuncWithMultiLabels("lcfwml", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 4} }) assert.Equal(t, `{"i1.a": 4}`, expvar.Get("lcfwml").String()) @@ -132,6 +136,10 @@ func TestGaugesFuncWithMultiLabels(t *testing.T) { ebd.NewGaugesFuncWithMultiLabels("", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 2} }) ebd.NewGaugesFuncWithMultiLabels("", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 3} }) + // Ensure reuse of global var is ignored. + ebd.NewGaugesFuncWithMultiLabels("ggfwml", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 1} }) + assert.Equal(t, `{"a": 1}`, expvar.Get("ggfwml").String()) + ebd.NewGaugesFuncWithMultiLabels("lgfwml", "", []string{"l"}, func() map[string]int64 { return map[string]int64{"a": 4} }) assert.Equal(t, `{"i1.a": 4}`, expvar.Get("lgfwml").String()) @@ -157,6 +165,11 @@ func TestCounter(t *testing.T) { ebd.NewCounter("", "") ebd.NewCounter("", "") + // Ensure global var gets reused. + c = ebd.NewCounter("gcounter", "") + c.Add(1) + assert.Equal(t, "2", expvar.Get("gcounter").String()) + c = ebd.NewCounter("lcounter", "") c.Add(4) assert.Equal(t, `{"i1": 4}`, expvar.Get("lcounter").String()) @@ -185,6 +198,11 @@ func TestGauge(t *testing.T) { ebd.NewGauge("", "") ebd.NewGauge("", "") + // Ensure global var gets reused. + c = ebd.NewGauge("ggauge", "") + c.Set(2) + assert.Equal(t, "2", expvar.Get("ggauge").String()) + c = ebd.NewGauge("lgauge", "") c.Set(4) assert.Equal(t, `{"i1": 4}`, expvar.Get("lgauge").String()) @@ -213,6 +231,10 @@ func TestCounterFunc(t *testing.T) { ebd.NewCounterFunc("", "", func() int64 { return 2 }) ebd.NewCounterFunc("", "", func() int64 { return 3 }) + // Ensure reuse of global var is ignored. + ebd.NewCounterFunc("gcf", "", func() int64 { return 2 }) + assert.Equal(t, "1", expvar.Get("gcf").String()) + ebd.NewCounterFunc("lcf", "", func() int64 { return 4 }) assert.Equal(t, `{"i1": 4}`, expvar.Get("lcf").String()) @@ -237,6 +259,10 @@ func TestGaugeFunc(t *testing.T) { ebd.NewGaugeFunc("", "", func() int64 { return 2 }) ebd.NewGaugeFunc("", "", func() int64 { return 3 }) + // Ensure reuse of global var is ignored. + ebd.NewGaugeFunc("ggf", "", func() int64 { return 2 }) + assert.Equal(t, "1", expvar.Get("ggf").String()) + ebd.NewGaugeFunc("lgf", "", func() int64 { return 4 }) assert.Equal(t, `{"i1": 4}`, expvar.Get("lgf").String()) @@ -261,6 +287,10 @@ func TestCounterDurationFunc(t *testing.T) { ebd.NewCounterDurationFunc("", "", func() time.Duration { return 2 }) ebd.NewCounterDurationFunc("", "", func() time.Duration { return 3 }) + // Ensure reuse of global var is ignored. + ebd.NewCounterDurationFunc("gcduration", "", func() time.Duration { return 2 }) + assert.Equal(t, "1", expvar.Get("gcduration").String()) + ebd.NewCounterDurationFunc("lcduration", "", func() time.Duration { return 4 }) assert.Equal(t, `{"i1": 4}`, expvar.Get("lcduration").String()) @@ -285,6 +315,10 @@ func TestGaugeDurationFunc(t *testing.T) { ebd.NewGaugeDurationFunc("", "", func() time.Duration { return 2 }) ebd.NewGaugeDurationFunc("", "", func() time.Duration { return 3 }) + // Ensure reuse of global var is ignored. + ebd.NewGaugeDurationFunc("ggduration", "", func() time.Duration { return 2 }) + assert.Equal(t, "1", expvar.Get("ggduration").String()) + ebd.NewGaugeDurationFunc("lgduration", "", func() time.Duration { return 4 }) assert.Equal(t, `{"i1": 4}`, expvar.Get("lgduration").String()) @@ -310,6 +344,11 @@ func TestCountersWithSingleLabel(t *testing.T) { ebd.NewCountersWithSingleLabel("", "", "l") ebd.NewCountersWithSingleLabel("", "", "l") + // Ensure global var gets reused. + g = ebd.NewCountersWithSingleLabel("gcwsl", "", "l") + g.Add("a", 1) + assert.Equal(t, `{"a": 2}`, expvar.Get("gcwsl").String()) + g = ebd.NewCountersWithSingleLabel("lcwsl", "", "l") g.Add("a", 4) assert.Equal(t, `{"i1.a": 4}`, expvar.Get("lcwsl").String()) @@ -338,6 +377,11 @@ func TestGaugesWithSingleLabel(t *testing.T) { ebd.NewGaugesWithSingleLabel("", "", "l") ebd.NewGaugesWithSingleLabel("", "", "l") + // Ensure reuse of global var is ignored. + g = ebd.NewGaugesWithSingleLabel("ggwsl", "", "l") + g.Set("a", 2) + assert.Equal(t, `{"a": 1}`, expvar.Get("ggwsl").String()) + g = ebd.NewGaugesWithSingleLabel("lgwsl", "", "l") g.Set("a", 4) assert.Equal(t, `{"i1.a": 4}`, expvar.Get("lgwsl").String()) @@ -367,6 +411,11 @@ func TestCountersWithMultiLabels(t *testing.T) { ebd.NewCountersWithMultiLabels("", "", []string{"l"}) ebd.NewCountersWithMultiLabels("", "", []string{"l"}) + // Ensure global var gets reused. + g = ebd.NewCountersWithMultiLabels("gcwml", "", []string{"l"}) + g.Add([]string{"a"}, 1) + assert.Equal(t, `{"a": 2}`, expvar.Get("gcwml").String()) + g = ebd.NewCountersWithMultiLabels("lcwml", "", []string{"l"}) g.Add([]string{"a"}, 4) assert.Equal(t, `{"i1.a": 4}`, expvar.Get("lcwml").String()) @@ -395,6 +444,11 @@ func TestGaugesWithMultiLabels(t *testing.T) { ebd.NewGaugesWithMultiLabels("", "", []string{"l"}) ebd.NewGaugesWithMultiLabels("", "", []string{"l"}) + // Ensure reuse of global var is ignored. + g = ebd.NewGaugesWithMultiLabels("ggwml", "", []string{"l"}) + g.Set([]string{"a"}, 2) + assert.Equal(t, `{"a": 1}`, expvar.Get("ggwml").String()) + g = ebd.NewGaugesWithMultiLabels("lgwml", "", []string{"l"}) g.Set([]string{"a"}, 4) assert.Equal(t, `{"i1.a": 4}`, expvar.Get("lgwml").String()) @@ -426,6 +480,11 @@ func TestTimings(t *testing.T) { ebd.NewTimings("", "", "l") ebd.NewTimings("", "", "l") + // Ensure global var gets reused. + g = ebd.NewTimings("gtimings", "", "l") + g.Add("a", 1) + assert.Contains(t, expvar.Get("gtimings").String(), `"TotalCount":3`) + g = ebd.NewTimings("ltimings", "", "l") g.Add("a", 1) g.Add("a", 1) @@ -469,6 +528,11 @@ func TestMultiTimings(t *testing.T) { ebd.NewMultiTimings("", "", []string{"l"}) ebd.NewMultiTimings("", "", []string{"l"}) + // Ensure global var gets reused. + g = ebd.NewMultiTimings("gmtimings", "", []string{"l"}) + g.Add([]string{"a"}, 1) + assert.Contains(t, expvar.Get("gmtimings").String(), `"TotalCount":3`) + g = ebd.NewMultiTimings("lmtimings", "", []string{"l"}) g.Add([]string{"a"}, 1) g.Add([]string{"a"}, 1) @@ -510,6 +574,10 @@ func TestRates(t *testing.T) { ebd.NewRates("", tm, 15*60/5, 5*time.Second) ebd.NewRates("", tm, 15*60/5, 5*time.Second) + // Ensure global var gets reused. + ebd.NewRates("grates", tm, 15*60/5, 5*time.Second) + assert.Equal(t, "{}", expvar.Get("grates").String()) + // Ensure var gets reused. rates1 := ebd.NewRates("lrates", tm, 15*60/5, 5*time.Second) rates2 := ebd.NewRates("lrates", tm, 15*60/5, 5*time.Second) @@ -532,6 +600,9 @@ func TestHistogram(t *testing.T) { ebd.NewHistogram("", "", []int64{10}) ebd.NewHistogram("", "", []int64{10}) + // Ensure reuse of global var doesn't panic. + _ = ebd.NewHistogram("ghistogram", "", []int64{10}) + g = ebd.NewHistogram("lhistogram", "", []int64{10}) g.Add(1) g.Add(1) @@ -556,6 +627,9 @@ func TestPublish(t *testing.T) { ebd.Publish("lpub", s) assert.Equal(t, `{"i1": "1"}`, expvar.Get("lpub").String()) + // Ensure reuse of global var doesn't panic. + ebd.Publish("gpub", s) + ebd = NewExporter("i2", "label") ebd.Publish("lpub", s) assert.Contains(t, expvar.Get("lpub").String(), `"i1": "1"`) diff --git a/go/vt/sqlparser/comments.go b/go/vt/sqlparser/comments.go index e8cd4b87652..cbcc44a32fc 100644 --- a/go/vt/sqlparser/comments.go +++ b/go/vt/sqlparser/comments.go @@ -32,6 +32,8 @@ const ( DirectiveQueryTimeout = "QUERY_TIMEOUT_MS" // DirectiveScatterErrorsAsWarnings enables partial success scatter select queries DirectiveScatterErrorsAsWarnings = "SCATTER_ERRORS_AS_WARNINGS" + // DirectiveIgnoreMaxPayloadSize skips payload size validation when set. + DirectiveIgnoreMaxPayloadSize = "IGNORE_MAX_PAYLOAD_SIZE" ) func isNonSpace(r rune) bool { @@ -295,3 +297,24 @@ func SkipQueryPlanCacheDirective(stmt Statement) bool { } return false } + +// IgnoreMaxPayloadSizeDirective returns true if the max payload size override +// directive is set to true. +func IgnoreMaxPayloadSizeDirective(stmt Statement) bool { + switch stmt := stmt.(type) { + case *Select: + directives := ExtractCommentDirectives(stmt.Comments) + return directives.IsSet(DirectiveIgnoreMaxPayloadSize) + case *Insert: + directives := ExtractCommentDirectives(stmt.Comments) + return directives.IsSet(DirectiveIgnoreMaxPayloadSize) + case *Update: + directives := ExtractCommentDirectives(stmt.Comments) + return directives.IsSet(DirectiveIgnoreMaxPayloadSize) + case *Delete: + directives := ExtractCommentDirectives(stmt.Comments) + return directives.IsSet(DirectiveIgnoreMaxPayloadSize) + default: + return false + } +} diff --git a/go/vt/sqlparser/comments_test.go b/go/vt/sqlparser/comments_test.go index 3d875faf1cb..8ec2a0e1995 100644 --- a/go/vt/sqlparser/comments_test.go +++ b/go/vt/sqlparser/comments_test.go @@ -17,8 +17,11 @@ limitations under the License. package sqlparser import ( + "fmt" "reflect" "testing" + + "github.com/stretchr/testify/assert" ) func TestSplitComments(t *testing.T) { @@ -385,3 +388,22 @@ func TestSkipQueryPlanCacheDirective(t *testing.T) { t.Errorf("d.SkipQueryPlanCacheDirective(stmt) should be true") } } + +func TestIgnoreMaxPayloadSizeDirective(t *testing.T) { + testCases := []struct { + query string + expected bool + }{ + {"insert /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ into user(id) values (1), (2)", true}, + {"insert into user(id) values (1), (2)", false}, + {"update /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ users set name=1", true}, + {"select /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ * from users", true}, + {"delete /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ from users", true}, + } + + for _, test := range testCases { + stmt, _ := Parse(test.query) + got := IgnoreMaxPayloadSizeDirective(stmt) + assert.Equalf(t, test.expected, got, fmt.Sprintf("d.IgnoreMaxPayloadSizeDirective(stmt) returned %v but expected %v", got, test.expected)) + } +} diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 3949e22843e..bd1a6280c92 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -397,7 +397,7 @@ var commands = []commandGroup{ "[-exclude_tables=''] [-include-views] ", "Validates that the master schema matches all of the slaves."}, {"ValidateSchemaKeyspace", commandValidateSchemaKeyspace, - "[-exclude_tables=''] [-include-views] ", + "[-exclude_tables=''] [-include-views] [-skip-no-master] ", "Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."}, {"ApplySchema", commandApplySchema, "[-allow_long_unavailability] [-wait_slave_timeout=10s] {-sql= || -sql-file=} ", @@ -2305,6 +2305,7 @@ func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subF func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/") includeViews := subFlags.Bool("include-views", false, "Includes views in the validation") + skipNoMaster := subFlags.Bool("skip-no-master", false, "Skip shards that don't have master when performing validation") if err := subFlags.Parse(args); err != nil { return err } @@ -2317,7 +2318,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews) + return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews, *skipNoMaster) } func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/vtctld/vtctld.go b/go/vt/vtctld/vtctld.go index fe6c2347372..7dde2434273 100644 --- a/go/vt/vtctld/vtctld.go +++ b/go/vt/vtctld/vtctld.go @@ -59,7 +59,7 @@ func InitVtctld(ts *topo.Server) { actionRepo.RegisterKeyspaceAction("ValidateSchemaKeyspace", func(ctx context.Context, wr *wrangler.Wrangler, keyspace string, r *http.Request) (string, error) { - return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil, false) + return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil, false, false) }) actionRepo.RegisterKeyspaceAction("ValidateVersionKeyspace", diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index 474ec99d35f..5cb17bbe6fb 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -39,20 +39,20 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/topoproto" ) var ( - cellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets") - refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval") - refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") - topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") - - allowedTabletTypes []topodatapb.TabletType - - tabletFilters flagutil.StringListValue + cellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets") + tabletFilters flagutil.StringListValue + refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval") + refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") + topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") + allowedTabletTypes []topodatapb.TabletType + routeReplicaToRdonly = flag.Bool("gateway_route_replica_to_rdonly", false, "route REPLICA queries to RDONLY tablets as well as REPLICA tablets") ) const ( @@ -282,6 +282,12 @@ func (dg *discoveryGateway) withRetry(ctx context.Context, target *querypb.Targe } tablets := dg.tsc.GetHealthyTabletStats(target.Keyspace, target.Shard, target.TabletType) + + // temporary hack to enable REPLICA type queries to address both REPLICA tablets and RDONLY tablets + if *routeReplicaToRdonly && target.TabletType == topodata.TabletType_REPLICA { + tablets = append(tablets, dg.tsc.GetHealthyTabletStats(target.Keyspace, target.Shard, topodata.TabletType_RDONLY)...) + } + if len(tablets) == 0 { // fail fast if there is no tablet err = vterrors.New(vtrpcpb.Code_UNAVAILABLE, "no valid tablet") diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index d09c2a273ae..c7227303a5a 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1235,10 +1235,14 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser. return nil, err } - // Normalize if possible and retry. query := sql statement := stmt bindVarNeeds := sqlparser.BindVarNeeds{} + if !sqlparser.IgnoreMaxPayloadSizeDirective(statement) && !isValidPayloadSize(query) { + return nil, vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "query payload size above threshold") + } + + // Normalize if possible and retry. if (e.normalize && sqlparser.CanNormalize(stmt)) || sqlparser.IsSetStatement(stmt) { parameterize := e.normalize // the public flag is called normalize result, err := sqlparser.PrepareAST(stmt, bindVars, "vtg", parameterize) @@ -1447,6 +1451,21 @@ func checkLikeOpt(likeOpt string, colNames []string) (string, error) { return "", nil } +// isValidPayloadSize validates whether a query payload is above the +// configured MaxPayloadSize threshold. The WarnPayloadSizeExceeded will increment +// if the payload size exceeds the warnPayloadSize. + +func isValidPayloadSize(query string) bool { + payloadSize := len(query) + if *maxPayloadSize > 0 && payloadSize > *maxPayloadSize { + return false + } + if *warnPayloadSize > 0 && payloadSize > *warnPayloadSize { + warnings.Add("WarnPayloadSizeExceeded", 1) + } + return true +} + // Prepare executes a prepare statements. func (e *Executor) Prepare(ctx context.Context, method string, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable) (fld []*querypb.Field, err error) { logStats := NewLogStats(ctx, method, sql, bindVars) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index fda3e46c3dd..00342f8ea8c 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -2313,6 +2313,52 @@ func TestGenerateCharsetRows(t *testing.T) { } } +func TestExecutorMaxPayloadSizeExceeded(t *testing.T) { + saveMax := *maxPayloadSize + saveWarn := *warnPayloadSize + *maxPayloadSize = 10 + *warnPayloadSize = 5 + defer func() { + *maxPayloadSize = saveMax + *warnPayloadSize = saveWarn + }() + + executor, _, _, _ := createExecutorEnv() + session := NewSafeSession(&vtgatepb.Session{TargetString: "@master"}) + warningCount := warnings.Counts()["WarnPayloadSizeExceeded"] + testMaxPayloadSizeExceeded := []string{ + "select * from main1", + "insert into main1(id) values (1), (2)", + "update main1 set id=1", + "delete from main1 where id=1", + } + for _, query := range testMaxPayloadSizeExceeded { + _, err := executor.Execute(context.Background(), "TestExecutorMaxPayloadSizeExceeded", session, query, nil) + require.NotNil(t, err) + assert.EqualError(t, err, "query payload size above threshold") + } + assert.Equal(t, warningCount, warnings.Counts()["WarnPayloadSizeExceeded"], "warnings count") + + testMaxPayloadSizeOverride := []string{ + "select /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ * from main1", + "insert /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ into main1(id) values (1), (2)", + "update /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ main1 set id=1", + "delete /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ from main1 where id=1", + } + for _, query := range testMaxPayloadSizeOverride { + _, err := executor.Execute(context.Background(), "TestExecutorMaxPayloadSizeWithOverride", session, query, nil) + assert.Equal(t, nil, err, "err should be nil") + } + assert.Equal(t, warningCount, warnings.Counts()["WarnPayloadSizeExceeded"], "warnings count") + + *maxPayloadSize = 1000 + for _, query := range testMaxPayloadSizeExceeded { + _, err := executor.Execute(context.Background(), "TestExecutorMaxPayloadSizeExceeded", session, query, nil) + assert.Equal(t, nil, err, "err should be nil") + } + assert.Equal(t, warningCount+4, warnings.Counts()["WarnPayloadSizeExceeded"], "warnings count") +} + func TestOlapSelectDatabase(t *testing.T) { executor, _, _, _ := createExecutorEnv() executor.normalize = true diff --git a/go/vt/vtgate/logstats.go b/go/vt/vtgate/logstats.go index 47f34e2d7ef..dfcc24c9e08 100644 --- a/go/vt/vtgate/logstats.go +++ b/go/vt/vtgate/logstats.go @@ -39,7 +39,9 @@ import ( type LogStats struct { Ctx context.Context Method string - Target *querypb.Target + Keyspace string + TabletType string + Table string StmtType string SQL string BindVariables map[string]*querypb.BindVariable @@ -151,9 +153,9 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { var fmtString string switch *streamlog.QueryLogFormat { case streamlog.QueryLogFormatText: - fmtString = "%v\t%v\t%v\t'%v'\t'%v'\t%v\t%v\t%.6f\t%.6f\t%.6f\t%.6f\t%v\t%q\t%v\t%v\t%v\t%q\t\n" + fmtString = "%v\t%v\t%v\t'%v'\t'%v'\t%v\t%v\t%.6f\t%.6f\t%.6f\t%.6f\t%v\t%q\t%v\t%v\t%v\t%q\t%q\t%q\t%q\t\n" case streamlog.QueryLogFormatJSON: - fmtString = "{\"Method\": %q, \"RemoteAddr\": %q, \"Username\": %q, \"ImmediateCaller\": %q, \"Effective Caller\": %q, \"Start\": \"%v\", \"End\": \"%v\", \"TotalTime\": %.6f, \"PlanTime\": %v, \"ExecuteTime\": %v, \"CommitTime\": %v, \"StmtType\": %q, \"SQL\": %q, \"BindVars\": %v, \"ShardQueries\": %v, \"RowsAffected\": %v, \"Error\": %q}\n" + fmtString = "{\"Method\": %q, \"RemoteAddr\": %q, \"Username\": %q, \"ImmediateCaller\": %q, \"Effective Caller\": %q, \"Start\": \"%v\", \"End\": \"%v\", \"TotalTime\": %.6f, \"PlanTime\": %v, \"ExecuteTime\": %v, \"CommitTime\": %v, \"StmtType\": %q, \"SQL\": %q, \"BindVars\": %v, \"ShardQueries\": %v, \"RowsAffected\": %v, \"Error\": %q, \"Keyspace\": %q, \"Table\": %q, \"TabletType\": %q}\n" } _, err := fmt.Fprintf( @@ -176,6 +178,9 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { stats.ShardQueries, stats.RowsAffected, stats.ErrorStr(), + stats.Keyspace, + stats.Table, + stats.TabletType, ) return err } diff --git a/go/vt/vtgate/logstats_test.go b/go/vt/vtgate/logstats_test.go index feb6eb63ed5..71d5226ba79 100644 --- a/go/vt/vtgate/logstats_test.go +++ b/go/vt/vtgate/logstats_test.go @@ -44,12 +44,15 @@ func TestLogStatsFormat(t *testing.T) { logStats := NewLogStats(context.Background(), "test", "sql1", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) + logStats.Keyspace = "ks" + logStats.Table = "table" + logStats.TabletType = "MASTER" params := map[string][]string{"full": {}} *streamlog.RedactDebugUIQueries = false *streamlog.QueryLogFormat = "text" got := testFormat(logStats, url.Values(params)) - want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n" + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\"ks\"\t\"table\"\t\"MASTER\"\t\n" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) } @@ -57,7 +60,7 @@ func TestLogStatsFormat(t *testing.T) { *streamlog.RedactDebugUIQueries = true *streamlog.QueryLogFormat = "text" got = testFormat(logStats, url.Values(params)) - want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\n" + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"ks\"\t\"table\"\t\"MASTER\"\t\n" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) } @@ -74,7 +77,7 @@ func TestLogStatsFormat(t *testing.T) { if err != nil { t.Errorf("logstats format: error marshaling json: %v -- got:\n%v", err, got) } - want = "{\n \"BindVars\": {\n \"intVal\": {\n \"type\": \"INT64\",\n \"value\": 1\n }\n },\n \"CommitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ExecuteTime\": 0,\n \"ImmediateCaller\": \"\",\n \"Method\": \"test\",\n \"PlanTime\": 0,\n \"RemoteAddr\": \"\",\n \"RowsAffected\": 0,\n \"SQL\": \"sql1\",\n \"ShardQueries\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"StmtType\": \"\",\n \"TotalTime\": 1.000001,\n \"Username\": \"\"\n}" + want = "{\n \"BindVars\": {\n \"intVal\": {\n \"type\": \"INT64\",\n \"value\": 1\n }\n },\n \"CommitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ExecuteTime\": 0,\n \"ImmediateCaller\": \"\",\n \"Keyspace\": \"ks\",\n \"Method\": \"test\",\n \"PlanTime\": 0,\n \"RemoteAddr\": \"\",\n \"RowsAffected\": 0,\n \"SQL\": \"sql1\",\n \"ShardQueries\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"StmtType\": \"\",\n \"Table\": \"table\",\n \"TabletType\": \"MASTER\",\n \"TotalTime\": 1.000001,\n \"Username\": \"\"\n}" if string(formatted) != want { t.Errorf("logstats format: got:\n%q\nwant:\n%v\n", string(formatted), want) } @@ -90,7 +93,7 @@ func TestLogStatsFormat(t *testing.T) { if err != nil { t.Errorf("logstats format: error marshaling json: %v -- got:\n%v", err, got) } - want = "{\n \"BindVars\": \"[REDACTED]\",\n \"CommitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ExecuteTime\": 0,\n \"ImmediateCaller\": \"\",\n \"Method\": \"test\",\n \"PlanTime\": 0,\n \"RemoteAddr\": \"\",\n \"RowsAffected\": 0,\n \"SQL\": \"sql1\",\n \"ShardQueries\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"StmtType\": \"\",\n \"TotalTime\": 1.000001,\n \"Username\": \"\"\n}" + want = "{\n \"BindVars\": \"[REDACTED]\",\n \"CommitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ExecuteTime\": 0,\n \"ImmediateCaller\": \"\",\n \"Keyspace\": \"ks\",\n \"Method\": \"test\",\n \"PlanTime\": 0,\n \"RemoteAddr\": \"\",\n \"RowsAffected\": 0,\n \"SQL\": \"sql1\",\n \"ShardQueries\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"StmtType\": \"\",\n \"Table\": \"table\",\n \"TabletType\": \"MASTER\",\n \"TotalTime\": 1.000001,\n \"Username\": \"\"\n}" if string(formatted) != want { t.Errorf("logstats format: got:\n%q\nwant:\n%v\n", string(formatted), want) } @@ -103,7 +106,7 @@ func TestLogStatsFormat(t *testing.T) { *streamlog.QueryLogFormat = "text" got = testFormat(logStats, url.Values(params)) - want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\tmap[strVal:type:VARBINARY value:\"abc\" ]\t0\t0\t\"\"\t\n" + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\tmap[strVal:type:VARBINARY value:\"abc\" ]\t0\t0\t\"\"\t\"ks\"\t\"table\"\t\"MASTER\"\t\n" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) } @@ -118,7 +121,7 @@ func TestLogStatsFormat(t *testing.T) { if err != nil { t.Errorf("logstats format: error marshaling json: %v -- got:\n%v", err, got) } - want = "{\n \"BindVars\": {\n \"strVal\": {\n \"type\": \"VARBINARY\",\n \"value\": \"abc\"\n }\n },\n \"CommitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ExecuteTime\": 0,\n \"ImmediateCaller\": \"\",\n \"Method\": \"test\",\n \"PlanTime\": 0,\n \"RemoteAddr\": \"\",\n \"RowsAffected\": 0,\n \"SQL\": \"sql1\",\n \"ShardQueries\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"StmtType\": \"\",\n \"TotalTime\": 1.000001,\n \"Username\": \"\"\n}" + want = "{\n \"BindVars\": {\n \"strVal\": {\n \"type\": \"VARBINARY\",\n \"value\": \"abc\"\n }\n },\n \"CommitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ExecuteTime\": 0,\n \"ImmediateCaller\": \"\",\n \"Keyspace\": \"ks\",\n \"Method\": \"test\",\n \"PlanTime\": 0,\n \"RemoteAddr\": \"\",\n \"RowsAffected\": 0,\n \"SQL\": \"sql1\",\n \"ShardQueries\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"StmtType\": \"\",\n \"Table\": \"table\",\n \"TabletType\": \"MASTER\",\n \"TotalTime\": 1.000001,\n \"Username\": \"\"\n}" if string(formatted) != want { t.Errorf("logstats format: got:\n%q\nwant:\n%v\n", string(formatted), want) } @@ -135,14 +138,14 @@ func TestLogStatsFilter(t *testing.T) { params := map[string][]string{"full": {}} got := testFormat(logStats, url.Values(params)) - want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n" + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\"\"\t\"\"\t\"\"\t\n" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) } *streamlog.QueryLogFilterTag = "LOG_THIS_QUERY" got = testFormat(logStats, url.Values(params)) - want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n" + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\"\"\t\"\"\t\"\"\t\n" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) } diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 5d4e271d2d3..e944861afec 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -207,6 +207,9 @@ func (e *planExecute) executePlan(ctx context.Context, plan *engine.Plan, vcurso } // 5: Log and add statistics + logStats.Keyspace = plan.Instructions.GetKeyspaceName() + logStats.Table = plan.Instructions.GetTableName() + logStats.TabletType = vcursor.TabletType().String() errCount := e.logExecutionEnd(logStats, execStart, plan, err, qr) plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), logStats.RowsAffected, errCount) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 0a92c6dbf95..a932f280bea 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -60,6 +60,11 @@ var ( _ = flag.Bool("disable_local_gateway", false, "deprecated: if specified, this process will not route any queries to local tablets in the local cell") maxMemoryRows = flag.Int("max_memory_rows", 300000, "Maximum number of rows that will be held in memory for intermediate results as well as the final result.") warnMemoryRows = flag.Int("warn_memory_rows", 30000, "Warning threshold for in-memory results. A row count higher than this amount will cause the VtGateWarnings.ResultsExceeded counter to be incremented.") + + // TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed + + maxPayloadSize = flag.Int("max_payload_size", 0, "The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query.") + warnPayloadSize = flag.Int("warn_payload_size", 0, "The warning threshold for query payloads in bytes. A payload greater than this threshold will cause the VtGateWarnings.WarnPayloadSizeExceeded counter to be incremented.") ) func getTxMode() vtgatepb.TransactionMode { @@ -187,7 +192,7 @@ func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, ce _ = stats.NewRates("ErrorsByDbType", stats.CounterForDimension(errorCounts, "DbType"), 15, 1*time.Minute) _ = stats.NewRates("ErrorsByCode", stats.CounterForDimension(errorCounts, "Code"), 15, 1*time.Minute) - warnings = stats.NewCountersWithSingleLabel("VtGateWarnings", "Vtgate warnings", "type", "IgnoredSet", "ResultsExceeded") + warnings = stats.NewCountersWithSingleLabel("VtGateWarnings", "Vtgate warnings", "type", "IgnoredSet", "ResultsExceeded", "WarnPayloadSizeExceeded") servenv.OnRun(func() { for _, f := range RegisterVTGates { diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index 8e622b4777f..cc22df3367e 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -186,7 +186,7 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str // ValidateSchemaKeyspace will diff the schema from all the tablets in // the keyspace. -func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews bool) error { +func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews, skipNoMaster bool) error { // find all the shards shards, err := wr.ts.GetShardNames(ctx, keyspace) if err != nil { @@ -202,42 +202,15 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews) } - // find the reference schema using the first shard's master - si, err := wr.ts.GetShard(ctx, keyspace, shards[0]) - if err != nil { - return fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shards[0], err) - } - if !si.HasMaster() { - return fmt.Errorf("no master in shard %v/%v", keyspace, shards[0]) - } - referenceAlias := si.MasterAlias - log.Infof("Gathering schema for reference master %v", topoproto.TabletAliasString(referenceAlias)) - referenceSchema, err := wr.GetSchema(ctx, referenceAlias, nil, excludeTables, includeViews) - if err != nil { - return fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", referenceAlias, excludeTables, includeViews, err) - } + var referenceSchema *tabletmanagerdatapb.SchemaDefinition + var referenceAlias *topodatapb.TabletAlias // then diff with all other tablets everywhere er := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} - // first diff the slaves in the reference shard 0 - aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shards[0]) - if err != nil { - return fmt.Errorf("FindAllTabletAliasesInShard(%v, %v) failed: %v", keyspace, shards[0], err) - } - - for _, alias := range aliases { - if topoproto.TabletAliasEqual(alias, si.MasterAlias) { - continue - } - - wg.Add(1) - go wr.diffSchema(ctx, referenceSchema, referenceAlias, alias, excludeTables, includeViews, &wg, &er) - } - // then diffs all tablets in the other shards - for _, shard := range shards[1:] { + for _, shard := range shards[0:] { si, err := wr.ts.GetShard(ctx, keyspace, shard) if err != nil { er.RecordError(fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err)) @@ -245,10 +218,21 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, } if !si.HasMaster() { - er.RecordError(fmt.Errorf("no master in shard %v/%v", keyspace, shard)) + if !skipNoMaster { + er.RecordError(fmt.Errorf("no master in shard %v/%v", keyspace, shard)) + } continue } + if referenceSchema == nil { + referenceAlias = si.MasterAlias + log.Infof("Gathering schema for reference master %v", topoproto.TabletAliasString(referenceAlias)) + referenceSchema, err = wr.GetSchema(ctx, referenceAlias, nil, excludeTables, includeViews) + if err != nil { + return fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", referenceAlias, excludeTables, includeViews, err) + } + } + aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard) if err != nil { er.RecordError(fmt.Errorf("FindAllTabletAliasesInShard(%v, %v) failed: %v", keyspace, shard, err)) @@ -256,6 +240,10 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, } for _, alias := range aliases { + // Don't diff schemas for self + if referenceAlias == alias { + continue + } wg.Add(1) go wr.diffSchema(ctx, referenceSchema, referenceAlias, alias, excludeTables, includeViews, &wg, &er) } diff --git a/tools/build_version_flags.sh b/tools/build_version_flags.sh index b2effa7fa42..c9ea079d135 100755 --- a/tools/build_version_flags.sh +++ b/tools/build_version_flags.sh @@ -21,20 +21,14 @@ source $DIR/shell_functions.inc # a tar ball might be used, which will prevent the git metadata from being available. # Should this be the case then allow environment variables to be used to source # this information instead. -_build_git_rev=$(git rev-parse --short HEAD) -if [ -z "$_build_git_rev" ]; then - _build_git_rev="$BUILD_GIT_REV" -fi -_build_git_branch=$(git rev-parse --abbrev-ref HEAD) -if [ -z "$_build_git_branch" ]; then - _build_git_branch="$BUILD_GIT_BRANCH" -fi +DEFAULT_BUILD_GIT_REV=$(git rev-parse --short HEAD) +DEFAULT_BUILD_GIT_BRANCH=$(git rev-parse --abbrev-ref HEAD) echo "\ -X 'vitess.io/vitess/go/vt/servenv.buildHost=$(hostname)' \ -X 'vitess.io/vitess/go/vt/servenv.buildUser=$(whoami)' \ - -X 'vitess.io/vitess/go/vt/servenv.buildGitRev=${_build_git_rev}' \ - -X 'vitess.io/vitess/go/vt/servenv.buildGitBranch=${_build_git_branch}' \ + -X 'vitess.io/vitess/go/vt/servenv.buildGitRev=${BUILD_GIT_REV:-$DEFAULT_BUILD_GIT_REV}' \ + -X 'vitess.io/vitess/go/vt/servenv.buildGitBranch=${BUILD_GIT_BRANCH:-$DEFAULT_BUILD_GIT_BRANCH}' \ -X 'vitess.io/vitess/go/vt/servenv.buildTime=$(LC_ALL=C date)' \ -X 'vitess.io/vitess/go/vt/servenv.jenkinsBuildNumberStr=${BUILD_NUMBER}' \ "