diff --git a/go/vt/vttablet/tabletserver/planbuilder/dml.go b/go/vt/vttablet/tabletserver/planbuilder/dml.go index 5e67140fa72..fa8de26171e 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/dml.go +++ b/go/vt/vttablet/tabletserver/planbuilder/dml.go @@ -317,10 +317,20 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan return plan, nil } tableName := sqlparser.GetTableName(ins.Table) - if tableName.IsEmpty() { + switch { + case tableName.IsTopic(): + msgTables := getTopicTables() + for _, t := range msgTables { + // recursively run inserts + } + plan.Reason = ReasonTopic + return plan, nil + + case tableName.IsEmpty(): plan.Reason = ReasonTable return plan, nil } + table, tableErr := plan.setTable(tableName, tables) switch { diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan.go b/go/vt/vttablet/tabletserver/planbuilder/plan.go index 65aa41c71a1..8522f27d4ae 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/plan.go +++ b/go/vt/vttablet/tabletserver/planbuilder/plan.go @@ -154,6 +154,7 @@ const ( ReasonReplace ReasonMultiTable NumReasons + ReasonTopic ) // Must exactly match order of reason constants. diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 65ad2dc0961..b9cebae9613 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -54,6 +54,7 @@ type Engine struct { mu sync.Mutex isOpen bool tables map[string]*Table + topics map[string][]*Table lastChange int64 reloadTime time.Duration notifiers map[string]notifier @@ -166,6 +167,9 @@ func (se *Engine) Open() error { // Skip over the table that had an error and move on to the next one return } + // register the message topic on the engine if necessary + se.registerTopic(table) + table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8]) mu.Lock() tables[tableName] = table @@ -336,6 +340,10 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string tabletenv.InternalErrors.Add("Schema", 1) return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err) } + + // register the message topic on the engine if necessary + se.registerTopic(table) + // table_rows, data_length, index_length, max_data_length table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8]) @@ -355,6 +363,32 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string return nil } +// registerTopic optionally connects the vt_topic metadata on a message table +// to a map of topic strings. A table can belong to only one topic. +func (se *Engine) registerTopic(ta *Table) { + if ta.MessageInfo == nil || ta.MessageInfo.Topic == "" { + return + } + + // lazily initialize the topic map if necessary + if se.topics == nil { + se.topics = make(map[string][]*Table) + } + + msgTables, ok := se.topics[ta.MessageInfo.Topic] + if ok { + // check to see if this table is already registered to the topic + for _, t := range msgTables { + if t == ta { + return + } + } + } + + // append this table to the list of subscribed tables to the topic + se.topics[ta.MessageInfo.Topic] = append(se.topics[ta.MessageInfo.Topic], ta) +} + // RegisterNotifier registers the function for schema change notification. // It also causes an immediate notification to the caller. The notified // function must not change the map or its contents. The only exception diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index 3257af030d0..4fbdf02866d 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -157,6 +157,12 @@ func loadMessageInfo(ta *Table, comment string) error { keyvals[kv[0]] = kv[1] } var err error + if ta.MessageInfo.Topic, err = getString(keyvals, "vt_topic"); err != nil { + // the topic is an optional value + if err.Error() != "attribute vt_topic not specified for message table" { + return err + } + } if ta.MessageInfo.AckWaitDuration, err = getDuration(keyvals, "vt_ack_wait"); err != nil { return err } @@ -239,3 +245,11 @@ func getNum(in map[string]string, key string) (int, error) { } return v, nil } + +func getString(in map[string]string, key string) (string, error) { + sv := in[key] + if sv == "" { + return 0, fmt.Errorf("attribute %s not specified for message table", key) + } + return sv, nil +} diff --git a/go/vt/vttablet/tabletserver/schema/schema.go b/go/vt/vttablet/tabletserver/schema/schema.go index 5e11e33ceab..7e16b7548fb 100644 --- a/go/vt/vttablet/tabletserver/schema/schema.go +++ b/go/vt/vttablet/tabletserver/schema/schema.go @@ -99,6 +99,10 @@ type MessageInfo struct { // returned for subscribers. Fields []*querypb.Field + // Optional topic to subscribe to. Any messages + // published to the topic will be added to this table. + Topic string + // AckWaitDuration specifies how long to wait after // the message was first sent. The back-off doubles // every attempt.