Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
signal func() // a function that we'll call whenever we have new schema data

// map of keyspace currently tracked
trackedMu sync.Mutex
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we should make it easy to see in the struct declaration which fields are protected by which mutex. trackedMu is kind of easy, but what the mu mutex is protecting is hard to see here.

tracked map[keyspaceStr]*updateController
consumeDelay time.Duration

Expand Down Expand Up @@ -96,7 +97,7 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T
return err
}

t.tracked[target.Keyspace].setLoaded(true)
t.setLoaded(target.Keyspace, true)
return nil
}

Expand Down Expand Up @@ -209,8 +210,8 @@ func (t *Tracker) Start() {
// getKeyspaceUpdateController returns the updateController for the given keyspace
// the updateController will be created if there was none.
func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updateController {
t.mu.Lock()
defer t.mu.Unlock()
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

ksUpdater, exists := t.tracked[th.Target.Keyspace]
if !exists {
Expand All @@ -224,6 +225,16 @@ func (t *Tracker) newUpdateController() *updateController {
return &updateController{update: t.updateSchema, reloadKeyspace: t.initKeyspace, signal: t.signal, consumeDelay: t.consumeDelay}
}

// setLoaded sets the loaded status for the given keyspace.
func (t *Tracker) setLoaded(ks keyspaceStr, loaded bool) {
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

if ksUpdater, exists := t.tracked[ks]; exists {
ksUpdater.setLoaded(loaded)
}
}

func (t *Tracker) initKeyspace(th *discovery.TabletHealth) error {
err := t.LoadKeyspace(th.Conn, th.Target)
if err != nil {
Expand Down Expand Up @@ -343,7 +354,7 @@ func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool {
return nil
})
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
t.setLoaded(th.Target.Keyspace, false)
// TODO: optimize for the tables that got errored out.
log.Warningf("error fetching new schema for %v, making them non-authoritative: %v", tablesUpdated, err)
return false
Expand Down Expand Up @@ -451,7 +462,7 @@ func (t *Tracker) updatedViewSchema(th *discovery.TabletHealth) bool {
return nil
})
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
t.setLoaded(th.Target.Keyspace, false)
// TODO: optimize for the views that got errored out.
log.Warningf("error fetching new views definition for %v", viewsUpdated, err)
return false
Expand All @@ -467,8 +478,9 @@ func (t *Tracker) updateViews(keyspace string, res map[string]string) {

// RegisterSignalReceiver allows a function to register to be called when new schema is available
func (t *Tracker) RegisterSignalReceiver(f func()) {
t.mu.Lock()
defer t.mu.Unlock()
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

for _, controller := range t.tracked {
controller.signal = f
}
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,38 @@ func TestTrackerGetKeyspaceUpdateController(t *testing.T) {
assert.Nil(t, ks3.reloadKeyspace, "ks3 already initialized")
}

// TestTrackerNoLock tests that processing of health check is not blocked while tracking is making GetSchema rpc calls.
func TestTrackerNoLock(t *testing.T) {
ch := make(chan *discovery.TabletHealth)
tracker := NewTracker(ch, true, false, sqlparser.NewTestParser())
tracker.consumeDelay = 1 * time.Millisecond
tracker.Start()
defer tracker.Stop()

target := &querypb.Target{Cell: cell, Keyspace: keyspace, Shard: "-80", TabletType: topodatapb.TabletType_PRIMARY}
tablet := &topodatapb.Tablet{Keyspace: target.Keyspace, Shard: target.Shard, Type: target.TabletType}

sbc := sandboxconn.NewSandboxConn(tablet)
sbc.GetSchemaDelayResponse = 100 * time.Millisecond

th := &discovery.TabletHealth{
Conn: sbc,
Tablet: tablet,
Target: target,
Serving: true,
Stats: &querypb.RealtimeStats{TableSchemaChanged: []string{"t1"}},
}

for i := 0; i < 500000; i++ {
select {
case ch <- th:
case <-time.After(5 * time.Millisecond):
t.Fatalf("failed to send health check to tracker")
}
}
require.GreaterOrEqual(t, sbc.GetSchemaCount.Load(), int64(1), "GetSchema rpc should be called")
}

type myTable struct {
name, create string
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type SandboxConn struct {
ReserveCount atomic.Int64
ReleaseCount atomic.Int64
GetSchemaCount atomic.Int64
GetSchemaDelayResponse time.Duration

queriesRequireLocking bool
queriesMu sync.Mutex
Expand Down Expand Up @@ -740,6 +741,9 @@ func (sbc *SandboxConn) Release(ctx context.Context, target *querypb.Target, tra
// GetSchema implements the QueryService interface
func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
sbc.GetSchemaCount.Add(1)
if sbc.GetSchemaDelayResponse > 0 {
time.Sleep(sbc.GetSchemaDelayResponse)
}
if len(sbc.getSchemaResult) == 0 {
return nil
}
Expand Down
Loading