Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"regexp"
"sync"
"testing"

Expand Down Expand Up @@ -330,9 +331,9 @@ func TestVStreamSharded(t *testing.T) {
received bool
}
expectedEvents := []*expectedEvent{
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false},
}
for {
Expand All @@ -357,7 +358,7 @@ func TestVStreamSharded(t *testing.T) {
for _, ev := range evs {
s := fmt.Sprintf("%v", ev)
for _, expectedEv := range expectedEvents {
if expectedEv.ev == s {
if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) {
expectedEv.received = true
break
}
Expand All @@ -381,6 +382,17 @@ func TestVStreamSharded(t *testing.T) {

}

func removeAnyDeprecatedDisplayWidths(orig string) string {
var adjusted string
baseIntType := "int"
intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`)
adjusted = intRE.ReplaceAllString(orig, baseIntType)
baseYearType := "year"
yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`)
adjusted = yearRE.ReplaceAllString(adjusted, baseYearType)
return adjusted
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
28 changes: 28 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -587,6 +589,32 @@ func (se *Engine) GetSchema() map[string]*Table {
return tables
}

// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema
func (se *Engine) MarshalMinimalSchema() ([]byte, error) {
se.mu.Lock()
defer se.mu.Unlock()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)),
}
for _, table := range se.tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
}
return proto.Marshal(dbSchema)
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
pkc := make([]int64, len(st.PKColumns))
for i, pk := range st.PKColumns {
pkc[i] = int64(pk)
}
table.PKColumns = pkc
return table
}

// GetConnection returns a connection from the pool
func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
return se.conns.Get(ctx, nil)
Expand Down
25 changes: 3 additions & 22 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -240,14 +238,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error
}

func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error {
tables := tr.engine.GetSchema()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: []*binlogdatapb.MinimalTable{},
}
for _, table := range tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
blob, err := tr.engine.MarshalMinimalSchema()
if err != nil {
return err
}
blob, _ := proto.Marshal(dbSchema)

conn, err := tr.engine.GetConnection(ctx)
if err != nil {
Expand All @@ -265,19 +259,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
return nil
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
var pkc []int64
for _, pk := range st.PKColumns {
pkc = append(pkc, int64(pk))
}
table.PKColumns = pkc
return table
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,14 @@ func (rs *rowStreamer) buildPlan() error {
return err
}
ti := &Table{
Name: st.Name,
Fields: st.Fields,
Name: st.Name,
}

ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields)
if err != nil {
return err
}

// The plan we build is identical to the one for vstreamer.
// This is because the row format of a read is identical
// to the row format of a binlog event. So, the same
Expand Down
Loading