Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ Flags:
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Flags:
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Flags:
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Flags:
--tablet_hostname string The hostname to use for the tablet otherwise it will be derived from OS' hostname (default "localhost")
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
88 changes: 44 additions & 44 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named

var waitGroup sync.WaitGroup
var tablet *topodatapb.Tablet
var fullStatus *replicationdatapb.FullStatus
var fs *replicationdatapb.FullStatus
readingStartTime := time.Now()
instance := NewInstance()
instanceFound := false
Expand Down Expand Up @@ -208,7 +208,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
goto Cleanup
}

fullStatus, err = FullStatus(tabletAlias)
fs, err = fullStatus(tabletAlias)
if err != nil {
goto Cleanup
}
Expand All @@ -218,48 +218,48 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
instance.Port = int(tablet.MysqlPort)
{
// We begin with a few operations we can run concurrently, and which do not depend on anything
instance.ServerID = uint(fullStatus.ServerId)
instance.Version = fullStatus.Version
instance.ReadOnly = fullStatus.ReadOnly
instance.LogBinEnabled = fullStatus.LogBinEnabled
instance.BinlogFormat = fullStatus.BinlogFormat
instance.LogReplicationUpdatesEnabled = fullStatus.LogReplicaUpdates
instance.VersionComment = fullStatus.VersionComment

if instance.LogBinEnabled && fullStatus.PrimaryStatus != nil {
binlogPos, err := getBinlogCoordinatesFromPositionString(fullStatus.PrimaryStatus.FilePosition)
instance.ServerID = uint(fs.ServerId)
instance.Version = fs.Version
instance.ReadOnly = fs.ReadOnly
instance.LogBinEnabled = fs.LogBinEnabled
instance.BinlogFormat = fs.BinlogFormat
instance.LogReplicationUpdatesEnabled = fs.LogReplicaUpdates
instance.VersionComment = fs.VersionComment

if instance.LogBinEnabled && fs.PrimaryStatus != nil {
binlogPos, err := getBinlogCoordinatesFromPositionString(fs.PrimaryStatus.FilePosition)
instance.SelfBinlogCoordinates = binlogPos
errorChan <- err
}

instance.SemiSyncPrimaryEnabled = fullStatus.SemiSyncPrimaryEnabled
instance.SemiSyncReplicaEnabled = fullStatus.SemiSyncReplicaEnabled
instance.SemiSyncPrimaryWaitForReplicaCount = uint(fullStatus.SemiSyncWaitForReplicaCount)
instance.SemiSyncPrimaryTimeout = fullStatus.SemiSyncPrimaryTimeout
instance.SemiSyncPrimaryEnabled = fs.SemiSyncPrimaryEnabled
instance.SemiSyncReplicaEnabled = fs.SemiSyncReplicaEnabled
instance.SemiSyncPrimaryWaitForReplicaCount = uint(fs.SemiSyncWaitForReplicaCount)
instance.SemiSyncPrimaryTimeout = fs.SemiSyncPrimaryTimeout

instance.SemiSyncPrimaryClients = uint(fullStatus.SemiSyncPrimaryClients)
instance.SemiSyncPrimaryStatus = fullStatus.SemiSyncPrimaryStatus
instance.SemiSyncReplicaStatus = fullStatus.SemiSyncReplicaStatus
instance.SemiSyncPrimaryClients = uint(fs.SemiSyncPrimaryClients)
instance.SemiSyncPrimaryStatus = fs.SemiSyncPrimaryStatus
instance.SemiSyncReplicaStatus = fs.SemiSyncReplicaStatus

if instance.IsOracleMySQL() || instance.IsPercona() {
// Stuff only supported on Oracle / Percona MySQL
// ...
// @@gtid_mode only available in Oracle / Percona MySQL >= 5.6
instance.GTIDMode = fullStatus.GtidMode
instance.ServerUUID = fullStatus.ServerUuid
if fullStatus.PrimaryStatus != nil {
GtidExecutedPos, err := replication.DecodePosition(fullStatus.PrimaryStatus.Position)
instance.GTIDMode = fs.GtidMode
instance.ServerUUID = fs.ServerUuid
if fs.PrimaryStatus != nil {
GtidExecutedPos, err := replication.DecodePosition(fs.PrimaryStatus.Position)
errorChan <- err
if err == nil && GtidExecutedPos.GTIDSet != nil {
instance.ExecutedGtidSet = GtidExecutedPos.GTIDSet.String()
}
}
GtidPurgedPos, err := replication.DecodePosition(fullStatus.GtidPurged)
GtidPurgedPos, err := replication.DecodePosition(fs.GtidPurged)
errorChan <- err
if err == nil && GtidPurgedPos.GTIDSet != nil {
instance.GtidPurged = GtidPurgedPos.GTIDSet.String()
}
instance.BinlogRowImage = fullStatus.BinlogRowImage
instance.BinlogRowImage = fs.BinlogRowImage

if instance.GTIDMode != "" && instance.GTIDMode != "OFF" {
instance.SupportsOracleGTID = true
Expand All @@ -269,45 +269,45 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named

instance.ReplicationIOThreadState = ReplicationThreadStateNoThread
instance.ReplicationSQLThreadState = ReplicationThreadStateNoThread
if fullStatus.ReplicationStatus != nil {
instance.HasReplicationCredentials = fullStatus.ReplicationStatus.SourceUser != ""
if fs.ReplicationStatus != nil {
instance.HasReplicationCredentials = fs.ReplicationStatus.SourceUser != ""

instance.ReplicationIOThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fullStatus.ReplicationStatus.IoState))
instance.ReplicationSQLThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fullStatus.ReplicationStatus.SqlState))
instance.ReplicationIOThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fs.ReplicationStatus.IoState))
instance.ReplicationSQLThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fs.ReplicationStatus.SqlState))
instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadState.IsRunning()
instance.ReplicationSQLThreadRuning = instance.ReplicationSQLThreadState.IsRunning()

binlogPos, err := getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.RelayLogSourceBinlogEquivalentPosition)
binlogPos, err := getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogSourceBinlogEquivalentPosition)
instance.ReadBinlogCoordinates = binlogPos
errorChan <- err

binlogPos, err = getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.FilePosition)
binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.FilePosition)
instance.ExecBinlogCoordinates = binlogPos
errorChan <- err
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()

binlogPos, err = getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.RelayLogFilePosition)
binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogFilePosition)
instance.RelaylogCoordinates = binlogPos
instance.RelaylogCoordinates.Type = RelayLog
errorChan <- err

instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fullStatus.ReplicationStatus.LastSqlError), "")
instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fullStatus.ReplicationStatus.LastIoError), "")
instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fs.ReplicationStatus.LastSqlError), "")
instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fs.ReplicationStatus.LastIoError), "")

instance.SQLDelay = fullStatus.ReplicationStatus.SqlDelay
instance.UsingOracleGTID = fullStatus.ReplicationStatus.AutoPosition
instance.UsingMariaDBGTID = fullStatus.ReplicationStatus.UsingGtid
instance.SourceUUID = fullStatus.ReplicationStatus.SourceUuid
instance.HasReplicationFilters = fullStatus.ReplicationStatus.HasReplicationFilters
instance.SQLDelay = fs.ReplicationStatus.SqlDelay
instance.UsingOracleGTID = fs.ReplicationStatus.AutoPosition
instance.UsingMariaDBGTID = fs.ReplicationStatus.UsingGtid
instance.SourceUUID = fs.ReplicationStatus.SourceUuid
instance.HasReplicationFilters = fs.ReplicationStatus.HasReplicationFilters

instance.SourceHost = fullStatus.ReplicationStatus.SourceHost
instance.SourcePort = int(fullStatus.ReplicationStatus.SourcePort)
instance.SourceHost = fs.ReplicationStatus.SourceHost
instance.SourcePort = int(fs.ReplicationStatus.SourcePort)

if fullStatus.ReplicationStatus.ReplicationLagUnknown {
if fs.ReplicationStatus.ReplicationLagUnknown {
instance.SecondsBehindPrimary.Valid = false
} else {
instance.SecondsBehindPrimary.Valid = true
instance.SecondsBehindPrimary.Int64 = int64(fullStatus.ReplicationStatus.ReplicationLagSeconds)
instance.SecondsBehindPrimary.Int64 = int64(fs.ReplicationStatus.ReplicationLagSeconds)
}
if instance.SecondsBehindPrimary.Valid && instance.SecondsBehindPrimary.Int64 < 0 {
log.Warningf("Alias: %+v, instance.SecondsBehindPrimary < 0 [%+v], correcting to 0", tabletAlias, instance.SecondsBehindPrimary.Int64)
Expand All @@ -316,7 +316,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
// And until told otherwise:
instance.ReplicationLagSeconds = instance.SecondsBehindPrimary

instance.AllowTLS = fullStatus.ReplicationStatus.SslAllowed
instance.AllowTLS = fs.ReplicationStatus.SslAllowed
}

instanceFound = true
Expand Down
23 changes: 7 additions & 16 deletions go/vt/vtorc/inst/tablet_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,20 @@ import (

// ErrTabletAliasNil is a fixed error message.
var ErrTabletAliasNil = errors.New("tablet alias is nil")
var tmc tmclient.TabletManagerClient

// ResetReplicationParameters resets the replication parameters on the given tablet.
func ResetReplicationParameters(tabletAlias string) error {
tablet, err := ReadTablet(tabletAlias)
if err != nil {
return err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
if err := tmc.ResetReplicationParameters(tmcCtx, tablet); err != nil {
return err
}
return nil
// InitializeTMC initializes the tablet manager client to use for all VTOrc RPC calls.
func InitializeTMC() tmclient.TabletManagerClient {
tmc = tmclient.NewTabletManagerClient()
return tmc
}

// FullStatus gets the full status of the MySQL running in vttablet.
func FullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) {
// fullStatus gets the full status of the MySQL running in vttablet.
func fullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) {
tablet, err := ReadTablet(tabletAlias)
if err != nil {
return nil, err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
return tmc.FullStatus(tmcCtx, tablet)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func RegisterFlags(fs *pflag.FlagSet) {
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
ts = topo.Open()
tmc = tmclient.NewTabletManagerClient()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
Expand Down Expand Up @@ -302,6 +302,11 @@ func changeTabletType(ctx context.Context, tablet *topodatapb.Tablet, tabletType
return tmc.ChangeType(ctx, tablet, tabletType, semiSync)
}

// resetReplicationParameters resets the replication parameters on the given tablet.
func resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet) error {
return tmc.ResetReplicationParameters(ctx, tablet)
}

// setReplicationSource calls the said RPC with the parameters provided
func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool) error {
return tmc.SetReplicationSource(ctx, replica, primary.Alias, 0, "", true, semiSync)
Expand Down
Loading