Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion go/vt/vttablet/tabletserver/planbuilder/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,20 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan
return plan, nil
}
tableName := sqlparser.GetTableName(ins.Table)
if tableName.IsEmpty() {
switch {
case tableName.IsTopic():
msgTables := getTopicTables()
for _, t := range msgTables {
// recursively run inserts
}
plan.Reason = ReasonTopic
return plan, nil

case tableName.IsEmpty():
plan.Reason = ReasonTable
return plan, nil
}

table, tableErr := plan.setTable(tableName, tables)

switch {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/planbuilder/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ const (
ReasonReplace
ReasonMultiTable
NumReasons
ReasonTopic
)

// Must exactly match order of reason constants.
Expand Down
34 changes: 34 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Engine struct {
mu sync.Mutex
isOpen bool
tables map[string]*Table
topics map[string][]*Table
lastChange int64
reloadTime time.Duration
notifiers map[string]notifier
Expand Down Expand Up @@ -166,6 +167,9 @@ func (se *Engine) Open() error {
// Skip over the table that had an error and move on to the next one
return
}
// register the message topic on the engine if necessary
se.registerTopic(table)

table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8])
mu.Lock()
tables[tableName] = table
Expand Down Expand Up @@ -336,6 +340,10 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string
tabletenv.InternalErrors.Add("Schema", 1)
return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err)
}

// register the message topic on the engine if necessary
se.registerTopic(table)

// table_rows, data_length, index_length, max_data_length
table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8])

Expand All @@ -355,6 +363,32 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string
return nil
}

// registerTopic optionally connects the vt_topic metadata on a message table
// to a map of topic strings. A table can belong to only one topic.
func (se *Engine) registerTopic(ta *Table) {
if ta.MessageInfo == nil || ta.MessageInfo.Topic == "" {
return
}

// lazily initialize the topic map if necessary
if se.topics == nil {
se.topics = make(map[string][]*Table)
}

msgTables, ok := se.topics[ta.MessageInfo.Topic]
if ok {
// check to see if this table is already registered to the topic
for _, t := range msgTables {
if t == ta {
return
}
}
}

// append this table to the list of subscribed tables to the topic
se.topics[ta.MessageInfo.Topic] = append(se.topics[ta.MessageInfo.Topic], ta)
}

// RegisterNotifier registers the function for schema change notification.
// It also causes an immediate notification to the caller. The notified
// function must not change the map or its contents. The only exception
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/tabletserver/schema/load_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func loadMessageInfo(ta *Table, comment string) error {
keyvals[kv[0]] = kv[1]
}
var err error
if ta.MessageInfo.Topic, err = getString(keyvals, "vt_topic"); err != nil {
// the topic is an optional value
if err.Error() != "attribute vt_topic not specified for message table" {
return err
}
}
if ta.MessageInfo.AckWaitDuration, err = getDuration(keyvals, "vt_ack_wait"); err != nil {
return err
}
Expand Down Expand Up @@ -239,3 +245,11 @@ func getNum(in map[string]string, key string) (int, error) {
}
return v, nil
}

func getString(in map[string]string, key string) (string, error) {
sv := in[key]
if sv == "" {
return 0, fmt.Errorf("attribute %s not specified for message table", key)
}
return sv, nil
}
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type MessageInfo struct {
// returned for subscribers.
Fields []*querypb.Field

// Optional topic to subscribe to. Any messages
// published to the topic will be added to this table.
Topic string

// AckWaitDuration specifies how long to wait after
// the message was first sent. The back-off doubles
// every attempt.
Expand Down