Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ const (
// not throttled.
NotThrottled time.Duration = 0

// ZeroRateNoProgess can be used to set maxRate to 0. In this case, the
// ZeroRateNoProgress can be used to set maxRate to 0. In this case, the
// throttler won't let any requests through until the rate is increased again.
ZeroRateNoProgess = 0
ZeroRateNoProgress = 0

// MaxRateModuleDisabled can be set in NewThrottler() to disable throttling
// by a fixed rate.
Expand Down Expand Up @@ -262,7 +262,7 @@ func (t *Throttler) updateMaxRate() {
return
}

if maxRate != ZeroRateNoProgess && maxRate < int64(threadsRunning) {
if maxRate != ZeroRateNoProgress && maxRate < int64(threadsRunning) {
log.Warningf("Set maxRate is less than the number of threads (%v). To prevent threads from starving, maxRate was increased from: %v to: %v.", threadsRunning, maxRate, threadsRunning)
maxRate = int64(threadsRunning)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestThreadFinished(t *testing.T) {
func TestThrottle_MaxRateIsZero(t *testing.T) {
fc := &fakeClock{}
// 1 Thread, 0 QPS.
throttler, _ := newThrottlerWithClock("test", "queries", 1, ZeroRateNoProgess, ReplicationLagModuleDisabled, fc.now)
throttler, _ := newThrottlerWithClock("test", "queries", 1, ZeroRateNoProgress, ReplicationLagModuleDisabled, fc.now)
defer throttler.Close()

fc.setNow(1000 * time.Millisecond)
Expand Down
15 changes: 8 additions & 7 deletions go/vt/worker/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ const (
// StreamExecute response. As of 06/2015, the default for it was 32 kB.
// Note that higher values for this flag --destination_pack_count will
// increase memory consumption in vtworker, vttablet and mysql.
defaultDestinationPackCount = 10
defaultDestinationWriterCount = 20
defaultMinHealthyRdonlyTablets = 2
defaultDestTabletType = "RDONLY"
defaultParallelDiffsCount = 8
defaultMaxTPS = throttler.MaxRateModuleDisabled
defaultMaxReplicationLag = throttler.ReplicationLagModuleDisabled
defaultDestinationPackCount = 10
defaultDestinationWriterCount = 20
defaultMinHealthyTablets = 2
defaultDestTabletType = "RDONLY"
defaultParallelDiffsCount = 8
defaultMaxTPS = throttler.MaxRateModuleDisabled
defaultMaxReplicationLag = throttler.ReplicationLagModuleDisabled
defaultUseConsistentSnapshot = false
)
143 changes: 141 additions & 2 deletions go/vt/worker/diff_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"time"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"
"vitess.io/vitess/go/vt/wrangler"

"golang.org/x/net/context"

Expand All @@ -36,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -157,8 +160,8 @@ func (qrr *QueryResultReader) Fields() []*querypb.Field {
}

// Close closes the connection to the tablet.
func (qrr *QueryResultReader) Close(ctx context.Context) error {
return qrr.conn.Close(ctx)
func (qrr *QueryResultReader) Close(ctx context.Context) {
qrr.conn.Close(ctx)
}

// v3KeyRangeFilter is a sqltypes.ResultStream implementation that filters
Expand Down Expand Up @@ -266,6 +269,16 @@ func TransactionalTableScan(ctx context.Context, log logutil.Logger, ts *topo.Se
return NewTransactionalQueryResultReaderForTablet(ctx, ts, tabletAlias, sql, txID)
}

// CreateTargetFrom is a helper function
func CreateTargetFrom(tablet *topodatapb.Tablet) *query.Target {
return &query.Target{
Cell: tablet.Alias.Cell,
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
}
}

// TableScanByKeyRange returns a QueryResultReader that gets all the
// rows from a table that match the supplied KeyRange, ordered by
// Primary Key. The returned columns are ordered with the Primary Key
Expand Down Expand Up @@ -637,3 +650,129 @@ func (rd *RowDiffer) Go(log logutil.Logger) (dr DiffReport, err error) {
advanceRight = true
}
}

// createTransactions returns an array of transactions that all share the same view of the data.
// It will check that no new transactions have been seen between the creation of the underlying transactions,
// to guarantee that all TransactionalTableScanner are pointing to the same point
func createTransactions(ctx context.Context, numberOfScanners int, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, queryService queryservice.QueryService, target *query.Target, tabletInfo *topodatapb.Tablet) ([]int64, error) {
scanners := make([]int64, numberOfScanners)
for i := 0; i < numberOfScanners; i++ {

tx, err := queryService.Begin(ctx, target, &query.ExecuteOptions{
// Make sure our tx is not killed by tx sniper
Workload: query.ExecuteOptions_DBA,
TransactionIsolation: query.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY,
})
if err != nil {
return nil, fmt.Errorf("could not open transaction on %v\n%v", topoproto.TabletAliasString(tabletInfo.Alias), err)
}

// Remember to rollback the transactions
cleaner.Record("CloseTransaction", topoproto.TabletAliasString(tabletInfo.Alias), func(ctx context.Context, wr *wrangler.Wrangler) error {
queryService, err := tabletconn.GetDialer()(tabletInfo, true)
if err != nil {
return err
}
return queryService.Rollback(ctx, target, tx)
})

scanners[i] = tx
}

return scanners, nil
}

// TableScanner is a simple abstraction that allows a TableScanner user to remain impervious
// by the transactionality of the connection
type TableScanner interface {
ScanTable(ctx context.Context, td *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error)
}

// TransactionalTableScanner works inside of a transaction set up with CONSISTENT SNAPSHOT
type TransactionalTableScanner struct {
wr *wrangler.Wrangler
cleaner *wrangler.Cleaner
tabletAlias *topodatapb.TabletAlias
queryService queryservice.QueryService
tx int64
}

// ScanTable performs a full table scan, ordered by the primary keys, if any
func (tt TransactionalTableScanner) ScanTable(ctx context.Context, td *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error) {
return TransactionalTableScan(ctx, tt.wr.Logger(), tt.wr.TopoServer(), tt.tabletAlias, tt.tx, td)
}

// NonTransactionalTableScanner just passes through the queries, and relies on paused replication traffic taking care of the consistent snapshot part
type NonTransactionalTableScanner struct {
wr *wrangler.Wrangler
cleaner *wrangler.Cleaner
tabletAlias *topodatapb.TabletAlias
queryService queryservice.QueryService
}

// ScanTable performs a full table scan, ordered by the primary keys, if any
func (ntts NonTransactionalTableScanner) ScanTable(ctx context.Context, td *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error) {
return TableScan(ctx, ntts.wr.Logger(), ntts.wr.TopoServer(), ntts.tabletAlias, td)
}

// CreateConsistentTableScanners will momentarily stop updates on the tablet, and then create connections that are all
// consistent snapshots of the same point in the transaction history
func CreateConsistentTableScanners(ctx context.Context, tablet *topo.TabletInfo, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, numberOfScanners int) ([]TableScanner, string, error) {
txs, gtid, err := CreateConsistentTransactions(ctx, tablet, wr, cleaner, numberOfScanners)
if err != nil {
return nil, "", err
}

queryService, err := tabletconn.GetDialer()(tablet.Tablet, true)
defer queryService.Close(ctx)

scanners := make([]TableScanner, numberOfScanners)
for i, tx := range txs {
scanners[i] = TransactionalTableScanner{
wr: wr,
cleaner: cleaner,
tabletAlias: tablet.Alias,
queryService: queryService,
tx: tx,
}
}

return scanners, gtid, nil
}

// CreateConsistentTransactions creates a number of consistent snapshot transactions,
// all starting from the same spot in the tx log
func CreateConsistentTransactions(ctx context.Context, tablet *topo.TabletInfo, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, numberOfScanners int) ([]int64, string, error) {
tm := tmclient.NewTabletManagerClient()
defer tm.Close()

// Lock all tables with a read lock to pause replication
err := tm.LockTables(ctx, tablet.Tablet)
if err != nil {
return nil, "", fmt.Errorf("could not lock tables on %v\n%v", topoproto.TabletAliasString(tablet.Tablet.Alias), err)
}
defer func() {
tm := tmclient.NewTabletManagerClient()
defer tm.Close()
tm.UnlockTables(ctx, tablet.Tablet)
wr.Logger().Infof("tables unlocked on %v", topoproto.TabletAliasString(tablet.Tablet.Alias))
}()

wr.Logger().Infof("tables locked on %v", topoproto.TabletAliasString(tablet.Tablet.Alias))
target := CreateTargetFrom(tablet.Tablet)

// Create transactions
queryService, err := tabletconn.GetDialer()(tablet.Tablet, true)
defer queryService.Close(ctx)
connections, err := createTransactions(ctx, numberOfScanners, wr, cleaner, queryService, target, tablet.Tablet)
if err != nil {
return nil, "", fmt.Errorf("failed to create transactions on %v: %v", topoproto.TabletAliasString(tablet.Tablet.Alias), err)
}
wr.Logger().Infof("transactions created on %v", topoproto.TabletAliasString(tablet.Tablet.Alias))
executedGtid, err := tm.MasterPosition(ctx, tablet.Tablet)
if err != nil {
return nil, "", fmt.Errorf("could not read executed GTID set on %v\n%v", topoproto.TabletAliasString(tablet.Tablet.Alias), err)
}

return connections, executedGtid, nil
}
4 changes: 2 additions & 2 deletions go/vt/worker/legacy_split_clone_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func commandLegacySplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag
sourceReaderCount := subFlags.Int("source_reader_count", defaultSourceReaderCount, "number of concurrent streaming queries to use on the source")
destinationPackCount := subFlags.Int("destination_pack_count", defaultDestinationPackCount, "number of packets to pack in one destination insert")
destinationWriterCount := subFlags.Int("destination_writer_count", defaultDestinationWriterCount, "number of concurrent RPCs to execute on the destination")
minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one")
minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyTablets, "minimum number of healthy RDONLY tablets before taking out one")
maxTPS := subFlags.Int64("max_tps", defaultMaxTPS, "if non-zero, limit copy to maximum number of (write) transactions/second on the destination (unlimited by default)")
if err := subFlags.Parse(args); err != nil {
return nil, err
Expand Down Expand Up @@ -146,7 +146,7 @@ func interactiveLegacySplitClone(ctx context.Context, wi *Instance, wr *wrangler
result["DefaultSourceReaderCount"] = fmt.Sprintf("%v", defaultSourceReaderCount)
result["DefaultDestinationPackCount"] = fmt.Sprintf("%v", defaultDestinationPackCount)
result["DefaultDestinationWriterCount"] = fmt.Sprintf("%v", defaultDestinationWriterCount)
result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets)
result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets)
result["DefaultMaxTPS"] = fmt.Sprintf("%v", defaultMaxTPS)
return nil, legacySplitCloneTemplate2, result, nil
}
Expand Down
Loading