diff --git a/go/vt/proto/vtctldata/vtctldata.pb.go b/go/vt/proto/vtctldata/vtctldata.pb.go index 0b7171a412a..8734dc87f02 100644 --- a/go/vt/proto/vtctldata/vtctldata.pb.go +++ b/go/vt/proto/vtctldata/vtctldata.pb.go @@ -111,26 +111,193 @@ func (m *ExecuteVtctlCommandResponse) GetEvent() *logutil.Event { return nil } +// TableMaterializeSttings contains the settings for one table. +type TableMaterializeSettings struct { + TargetTable string `protobuf:"bytes,1,opt,name=target_table,json=targetTable,proto3" json:"target_table,omitempty"` + // source_expression is a select statement. + SourceExpression string `protobuf:"bytes,2,opt,name=source_expression,json=sourceExpression,proto3" json:"source_expression,omitempty"` + // create_ddl contains the DDL to create the target table. + // If empty, the target table must already exist. + // if "copy", the target table DDL is the same as the source table. + CreateDdl string `protobuf:"bytes,3,opt,name=create_ddl,json=createDdl,proto3" json:"create_ddl,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TableMaterializeSettings) Reset() { *m = TableMaterializeSettings{} } +func (m *TableMaterializeSettings) String() string { return proto.CompactTextString(m) } +func (*TableMaterializeSettings) ProtoMessage() {} +func (*TableMaterializeSettings) Descriptor() ([]byte, []int) { + return fileDescriptor_f41247b323a1ab2e, []int{2} +} + +func (m *TableMaterializeSettings) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TableMaterializeSettings.Unmarshal(m, b) +} +func (m *TableMaterializeSettings) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TableMaterializeSettings.Marshal(b, m, deterministic) +} +func (m *TableMaterializeSettings) XXX_Merge(src proto.Message) { + xxx_messageInfo_TableMaterializeSettings.Merge(m, src) +} +func (m *TableMaterializeSettings) XXX_Size() int { + return xxx_messageInfo_TableMaterializeSettings.Size(m) +} +func (m *TableMaterializeSettings) XXX_DiscardUnknown() { + xxx_messageInfo_TableMaterializeSettings.DiscardUnknown(m) +} + +var xxx_messageInfo_TableMaterializeSettings proto.InternalMessageInfo + +func (m *TableMaterializeSettings) GetTargetTable() string { + if m != nil { + return m.TargetTable + } + return "" +} + +func (m *TableMaterializeSettings) GetSourceExpression() string { + if m != nil { + return m.SourceExpression + } + return "" +} + +func (m *TableMaterializeSettings) GetCreateDdl() string { + if m != nil { + return m.CreateDdl + } + return "" +} + +// MaterializeSettings contains the settings for the Materialize command. +type MaterializeSettings struct { + // workflow is the name of the workflow. + Workflow string `protobuf:"bytes,1,opt,name=workflow,proto3" json:"workflow,omitempty"` + SourceKeyspace string `protobuf:"bytes,2,opt,name=source_keyspace,json=sourceKeyspace,proto3" json:"source_keyspace,omitempty"` + TargetKeyspace string `protobuf:"bytes,3,opt,name=target_keyspace,json=targetKeyspace,proto3" json:"target_keyspace,omitempty"` + // stop_after_copy specifies if vreplication should be stopped after copying. + StopAfterCopy bool `protobuf:"varint,4,opt,name=stop_after_copy,json=stopAfterCopy,proto3" json:"stop_after_copy,omitempty"` + TableSettings []*TableMaterializeSettings `protobuf:"bytes,5,rep,name=table_settings,json=tableSettings,proto3" json:"table_settings,omitempty"` + // optional parameters. + Cell string `protobuf:"bytes,6,opt,name=cell,proto3" json:"cell,omitempty"` + TabletTypes string `protobuf:"bytes,7,opt,name=tablet_types,json=tabletTypes,proto3" json:"tablet_types,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MaterializeSettings) Reset() { *m = MaterializeSettings{} } +func (m *MaterializeSettings) String() string { return proto.CompactTextString(m) } +func (*MaterializeSettings) ProtoMessage() {} +func (*MaterializeSettings) Descriptor() ([]byte, []int) { + return fileDescriptor_f41247b323a1ab2e, []int{3} +} + +func (m *MaterializeSettings) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MaterializeSettings.Unmarshal(m, b) +} +func (m *MaterializeSettings) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MaterializeSettings.Marshal(b, m, deterministic) +} +func (m *MaterializeSettings) XXX_Merge(src proto.Message) { + xxx_messageInfo_MaterializeSettings.Merge(m, src) +} +func (m *MaterializeSettings) XXX_Size() int { + return xxx_messageInfo_MaterializeSettings.Size(m) +} +func (m *MaterializeSettings) XXX_DiscardUnknown() { + xxx_messageInfo_MaterializeSettings.DiscardUnknown(m) +} + +var xxx_messageInfo_MaterializeSettings proto.InternalMessageInfo + +func (m *MaterializeSettings) GetWorkflow() string { + if m != nil { + return m.Workflow + } + return "" +} + +func (m *MaterializeSettings) GetSourceKeyspace() string { + if m != nil { + return m.SourceKeyspace + } + return "" +} + +func (m *MaterializeSettings) GetTargetKeyspace() string { + if m != nil { + return m.TargetKeyspace + } + return "" +} + +func (m *MaterializeSettings) GetStopAfterCopy() bool { + if m != nil { + return m.StopAfterCopy + } + return false +} + +func (m *MaterializeSettings) GetTableSettings() []*TableMaterializeSettings { + if m != nil { + return m.TableSettings + } + return nil +} + +func (m *MaterializeSettings) GetCell() string { + if m != nil { + return m.Cell + } + return "" +} + +func (m *MaterializeSettings) GetTabletTypes() string { + if m != nil { + return m.TabletTypes + } + return "" +} + func init() { proto.RegisterType((*ExecuteVtctlCommandRequest)(nil), "vtctldata.ExecuteVtctlCommandRequest") proto.RegisterType((*ExecuteVtctlCommandResponse)(nil), "vtctldata.ExecuteVtctlCommandResponse") + proto.RegisterType((*TableMaterializeSettings)(nil), "vtctldata.TableMaterializeSettings") + proto.RegisterType((*MaterializeSettings)(nil), "vtctldata.MaterializeSettings") } func init() { proto.RegisterFile("vtctldata.proto", fileDescriptor_f41247b323a1ab2e) } var fileDescriptor_f41247b323a1ab2e = []byte{ - // 200 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0xcf, 0xd1, 0x4a, 0x87, 0x30, - 0x14, 0x06, 0x70, 0xd6, 0xbf, 0x82, 0xff, 0x42, 0x83, 0x5d, 0x89, 0xdd, 0x88, 0x54, 0xec, 0xca, - 0x41, 0xbd, 0x41, 0xe2, 0x0b, 0x8c, 0x28, 0xe8, 0x26, 0x96, 0x1e, 0x64, 0xa0, 0x3b, 0xe6, 0xce, - 0x46, 0x8f, 0x1f, 0x3a, 0xf2, 0xaa, 0xbb, 0x8f, 0xdf, 0x37, 0xc6, 0x77, 0xf8, 0x6d, 0xa4, 0x9e, - 0xa6, 0xc1, 0x90, 0x69, 0x96, 0x15, 0x09, 0xc5, 0xf9, 0x80, 0x32, 0x9b, 0x70, 0x0c, 0x64, 0xa7, - 0xd4, 0xd4, 0xef, 0xbc, 0xec, 0x7e, 0xa0, 0x0f, 0x04, 0x6f, 0xdb, 0x93, 0x16, 0xe7, 0xd9, 0xb8, - 0x41, 0xc3, 0x77, 0x00, 0x4f, 0x42, 0xf0, 0x4b, 0xb3, 0x8e, 0xbe, 0x60, 0xd5, 0x49, 0x9e, 0xf5, - 0x9e, 0xc5, 0x03, 0xcf, 0x4d, 0x4f, 0x16, 0xdd, 0x27, 0xd9, 0x19, 0x30, 0x50, 0x71, 0x51, 0x31, - 0x79, 0xd2, 0x59, 0xd2, 0xd7, 0x84, 0x75, 0xcb, 0xef, 0xfe, 0xfd, 0xd8, 0x2f, 0xe8, 0x3c, 0x88, - 0x7b, 0x7e, 0x05, 0x11, 0x1c, 0x15, 0xac, 0x62, 0xf2, 0xe6, 0x29, 0x6f, 0xfe, 0x66, 0x75, 0x9b, - 0xea, 0x54, 0xbe, 0xc8, 0x8f, 0xc7, 0x68, 0x09, 0xbc, 0x6f, 0x2c, 0xaa, 0x94, 0xd4, 0x88, 0x2a, - 0x92, 0xda, 0xd7, 0xab, 0xe3, 0xac, 0xaf, 0xeb, 0x1d, 0x9e, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, - 0xa9, 0xd5, 0x57, 0x1c, 0xfb, 0x00, 0x00, 0x00, + // 422 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0xd1, 0x6e, 0xd3, 0x30, + 0x14, 0x86, 0x95, 0xb5, 0x1b, 0xeb, 0x29, 0x4d, 0xc1, 0xdc, 0x58, 0x45, 0x48, 0xa1, 0xc0, 0x88, + 0x84, 0xd4, 0x48, 0xe3, 0x09, 0xa0, 0xf4, 0x06, 0xc4, 0x4d, 0xa8, 0x40, 0xe2, 0x26, 0x72, 0x93, + 0xb3, 0xc8, 0x9a, 0x1b, 0x07, 0xfb, 0xa4, 0x5b, 0x79, 0x03, 0x5e, 0x86, 0x67, 0x44, 0xb6, 0xb3, + 0x70, 0xb3, 0xdd, 0x1d, 0x7f, 0xe7, 0xb7, 0xfd, 0x9f, 0x5f, 0x07, 0xe6, 0x07, 0x2a, 0x49, 0x55, + 0x82, 0xc4, 0xaa, 0x35, 0x9a, 0x34, 0x9b, 0x0c, 0x60, 0x31, 0x53, 0xba, 0xee, 0x48, 0xaa, 0xd0, + 0x59, 0xfe, 0x80, 0xc5, 0xe6, 0x16, 0xcb, 0x8e, 0xf0, 0xbb, 0x93, 0xac, 0xf5, 0x7e, 0x2f, 0x9a, + 0x2a, 0xc7, 0x5f, 0x1d, 0x5a, 0x62, 0x0c, 0xc6, 0xc2, 0xd4, 0x96, 0x47, 0xc9, 0x28, 0x9d, 0xe4, + 0xbe, 0x66, 0x6f, 0x20, 0x16, 0x25, 0x49, 0xdd, 0x14, 0x24, 0xf7, 0xa8, 0x3b, 0xe2, 0x27, 0x49, + 0x94, 0x8e, 0xf2, 0x59, 0xa0, 0xdb, 0x00, 0x97, 0x6b, 0x78, 0x7e, 0xef, 0xc3, 0xb6, 0xd5, 0x8d, + 0x45, 0xf6, 0x1a, 0x4e, 0xf1, 0x80, 0x0d, 0xf1, 0x28, 0x89, 0xd2, 0xe9, 0x65, 0xbc, 0xba, 0xb3, + 0xb5, 0x71, 0x34, 0x0f, 0xcd, 0xe5, 0x9f, 0x08, 0xf8, 0x56, 0xec, 0x14, 0x7e, 0x15, 0x84, 0x46, + 0x0a, 0x25, 0x7f, 0xe3, 0x37, 0x24, 0x92, 0x4d, 0x6d, 0xd9, 0x4b, 0x78, 0x4c, 0xc2, 0xd4, 0x48, + 0x05, 0x39, 0x89, 0x7f, 0x69, 0x92, 0x4f, 0x03, 0xf3, 0xb7, 0xd8, 0x3b, 0x78, 0x6a, 0x75, 0x67, + 0x4a, 0x2c, 0xf0, 0xb6, 0x35, 0x68, 0xad, 0xd4, 0x8d, 0xb7, 0x3b, 0xc9, 0x9f, 0x84, 0xc6, 0x66, + 0xe0, 0xec, 0x05, 0x40, 0x69, 0x50, 0x10, 0x16, 0x55, 0xa5, 0xf8, 0xc8, 0xab, 0x26, 0x81, 0x7c, + 0xaa, 0xd4, 0xf2, 0xef, 0x09, 0x3c, 0xbb, 0xcf, 0xc6, 0x02, 0xce, 0x6f, 0xb4, 0xb9, 0xbe, 0x52, + 0xfa, 0xa6, 0xb7, 0x30, 0x9c, 0xd9, 0x5b, 0x98, 0xf7, 0xff, 0x5f, 0xe3, 0xd1, 0xb6, 0xa2, 0xc4, + 0xfe, 0xf7, 0x38, 0xe0, 0x2f, 0x3d, 0x75, 0xc2, 0x7e, 0x96, 0x41, 0x18, 0x0c, 0xc4, 0x01, 0x0f, + 0xc2, 0x0b, 0x98, 0x5b, 0xd2, 0x6d, 0x21, 0xae, 0x08, 0x4d, 0x51, 0xea, 0xf6, 0xc8, 0xc7, 0x49, + 0x94, 0x9e, 0xe7, 0x33, 0x87, 0x3f, 0x38, 0xba, 0xd6, 0xed, 0x91, 0x7d, 0x86, 0xd8, 0xa7, 0x52, + 0xd8, 0xde, 0x27, 0x3f, 0x4d, 0x46, 0xe9, 0xf4, 0xf2, 0xd5, 0xea, 0xff, 0x6e, 0x3c, 0x94, 0x6c, + 0x3e, 0xf3, 0x57, 0x87, 0x09, 0x19, 0x8c, 0x4b, 0x54, 0x8a, 0x9f, 0x79, 0x47, 0xbe, 0x0e, 0xe1, + 0xef, 0x94, 0x0b, 0xff, 0xd8, 0xa2, 0xe5, 0x8f, 0xee, 0xc2, 0x77, 0x6c, 0xeb, 0xd0, 0xc7, 0xf4, + 0xe7, 0xc5, 0x41, 0x12, 0x5a, 0xbb, 0x92, 0x3a, 0x0b, 0x55, 0x56, 0xeb, 0xec, 0x40, 0x99, 0x5f, + 0xbd, 0x6c, 0x30, 0xb2, 0x3b, 0xf3, 0xe0, 0xfd, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x46, 0x37, + 0xd0, 0x53, 0xb8, 0x02, 0x00, 0x00, } diff --git a/go/vt/sqlparser/analyzer.go b/go/vt/sqlparser/analyzer.go index 534903e9a71..69f8bd2621f 100644 --- a/go/vt/sqlparser/analyzer.go +++ b/go/vt/sqlparser/analyzer.go @@ -175,6 +175,31 @@ func SplitAndExpression(filters []Expr, node Expr) []Expr { return append(filters, node) } +// TableFromStatement returns the qualified table name for the query. +// This works only for select statements. +func TableFromStatement(sql string) (TableName, error) { + stmt, err := Parse(sql) + if err != nil { + return TableName{}, err + } + sel, ok := stmt.(*Select) + if !ok { + return TableName{}, fmt.Errorf("unrecognized statement: %s", sql) + } + if len(sel.From) != 1 { + return TableName{}, fmt.Errorf("table expression is complex") + } + aliased, ok := sel.From[0].(*AliasedTableExpr) + if !ok { + return TableName{}, fmt.Errorf("table expression is complex") + } + tableName, ok := aliased.Expr.(TableName) + if !ok { + return TableName{}, fmt.Errorf("table expression is complex") + } + return tableName, nil +} + // GetTableName returns the table name from the SimpleTableExpr // only if it's a simple expression. Otherwise, it returns "". func GetTableName(node SimpleTableExpr) TableIdent { diff --git a/go/vt/sqlparser/analyzer_test.go b/go/vt/sqlparser/analyzer_test.go index f97f36ccbc2..c73f691632d 100644 --- a/go/vt/sqlparser/analyzer_test.go +++ b/go/vt/sqlparser/analyzer_test.go @@ -160,6 +160,49 @@ func TestSplitAndExpression(t *testing.T) { } } +func TestTableFromStatement(t *testing.T) { + testcases := []struct { + in, out string + }{{ + in: "select * from t", + out: "t", + }, { + in: "select * from t.t", + out: "t.t", + }, { + in: "select * from t1, t2", + out: "table expression is complex", + }, { + in: "select * from (t)", + out: "table expression is complex", + }, { + in: "select * from t1 join t2", + out: "table expression is complex", + }, { + in: "select * from (select * from t) as tt", + out: "table expression is complex", + }, { + in: "update t set a=1", + out: "unrecognized statement: update t set a=1", + }, { + in: "bad query", + out: "syntax error at position 4 near 'bad'", + }} + + for _, tc := range testcases { + name, err := TableFromStatement(tc.in) + var got string + if err != nil { + got = err.Error() + } else { + got = String(name) + } + if got != tc.out { + t.Errorf("TableFromStatement('%s'): %s, want %s", tc.in, got, tc.out) + } + } +} + func TestGetTableName(t *testing.T) { testcases := []struct { in, out string diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 7eb63e19f96..72b44aa75a6 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -21,7 +21,6 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/context" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/event" @@ -29,6 +28,7 @@ import ( "vitess.io/vitess/go/vt/topo/events" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // This file contains keyspace utility functions @@ -62,25 +62,25 @@ func (ki *KeyspaceInfo) CheckServedFromMigration(tabletType topodatapb.TabletTyp // master is a special case with a few extra checks if tabletType == topodatapb.TabletType_MASTER { if !remove { - return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot add master back to %v", ki.keyspace) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot add master back to %v", ki.keyspace) } if len(cells) > 0 { - return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot migrate only some cells for master removal in keyspace %v", ki.keyspace) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot migrate only some cells for master removal in keyspace %v", ki.keyspace) } if len(ki.ServedFroms) > 1 { - return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot migrate master into %v until everything else is migrated", ki.keyspace) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot migrate master into %v until everything else is migrated", ki.keyspace) } } // we can't remove a type we don't have if ki.GetServedFrom(tabletType) == nil && remove { - return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "supplied type cannot be migrated") + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "supplied type cannot be migrated") } // check the keyspace is consistent in any case for _, ksf := range ki.ServedFroms { if ksf.Keyspace != keyspace { - return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "inconsistent keyspace specified in migration: %v != %v for type %v", keyspace, ksf.Keyspace, ksf.TabletType) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "inconsistent keyspace specified in migration: %v != %v for type %v", keyspace, ksf.Keyspace, ksf.TabletType) } } @@ -129,7 +129,7 @@ func (ki *KeyspaceInfo) UpdateServedFromMap(tabletType topodatapb.TabletType, ce } } else { if ksf.Keyspace != keyspace { - return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot UpdateServedFromMap on existing record for keyspace %v, different keyspace: %v != %v", ki.keyspace, ksf.Keyspace, keyspace) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot UpdateServedFromMap on existing record for keyspace %v, different keyspace: %v != %v", ki.keyspace, ksf.Keyspace, keyspace) } ksf.Cells = addCells(ksf.Cells, cells) } @@ -240,6 +240,30 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string) return result, nil } +// GetServingShards returns all shards where the master is serving. +func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) { + shards, err := ts.GetShardNames(ctx, keyspace) + if err != nil { + return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) + } + + result := make([]*ShardInfo, 0, len(shards)) + for _, shard := range shards { + si, err := ts.GetShard(ctx, keyspace, shard) + if err != nil { + return nil, vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) + } + if !si.IsMasterServing { + continue + } + result = append(result, si) + } + if len(result) == 0 { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%v has no serving shards", keyspace) + } + return result, nil +} + // GetOnlyShard returns the single ShardInfo of an unsharded keyspace. func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo, error) { allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace) @@ -251,7 +275,7 @@ func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo return s, nil } } - return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "keyspace %s must have one and only one shard: %v", keyspace, allShards) + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "keyspace %s must have one and only one shard: %v", keyspace, allShards) } // DeleteKeyspace wraps the underlying Conn.Delete diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index a7050ded940..21c14504b34 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -126,6 +126,7 @@ import ( replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/proto/vttime" ) @@ -310,6 +311,9 @@ var commands = []commandGroup{ {"Reshard", commandReshard, "[-skip_schema_copy] ", "Start a Resharding process. Example: Reshard ks.workflow001 '0' '-80,80-'"}, + {"Materialize", commandMaterialize, + `, example : '{"workflow": "aaa", "source_keyspace": "source", "target_keyspace": "target", "table_settings": [{"target_table": "customer", "source_expression": "select * from customer", "create_ddl": "copy"}]}'`, + "Performs materialization based on the json spec."}, {"SplitClone", commandSplitClone, " ", "Start the SplitClone process to perform horizontal resharding. Example: SplitClone ks '0' '-80,80-'"}, @@ -1804,6 +1808,20 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy) } +func commandMaterialize(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 1 { + return fmt.Errorf("a single argument is required: ") + } + ms := &vtctldatapb.MaterializeSettings{} + if err := json2.Unmarshal([]byte(subFlags.Arg(0)), ms); err != nil { + return err + } + return wr.Materialize(ctx, ms) +} + func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { if err := subFlags.Parse(args); err != nil { return err diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 50e776dd4a9..6bddad43359 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -614,6 +614,29 @@ func LoadFormalKeyspace(filename string) (*vschemapb.Keyspace, error) { return formal, nil } +// FindBestColVindex finds the best ColumnVindex for VReplication. +func FindBestColVindex(table *Table) (*ColumnVindex, error) { + if len(table.ColumnVindexes) == 0 { + return nil, fmt.Errorf("table %s has no vindex", table.Name.String()) + } + var result *ColumnVindex + for _, cv := range table.ColumnVindexes { + if cv.Vindex.NeedsVCursor() { + continue + } + if !cv.Vindex.IsUnique() { + continue + } + if result == nil || result.Vindex.Cost() > cv.Vindex.Cost() { + result = cv + } + } + if result == nil { + return nil, fmt.Errorf("could not find a vindex to compute keyspace id for table %v", table.Name.String()) + } + return result, nil +} + // FindVindexForSharding searches through the given slice // to find the lowest cost unique vindex // primary vindex is always unique diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index a54e78cbb1c..14088eed740 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -24,6 +24,8 @@ import ( "testing" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/json2" "vitess.io/vitess/go/sqltypes" @@ -36,6 +38,27 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +// cheapVindex is a Functional, Unique Vindex. +type cheapVindex struct { + name string + Params map[string]string +} + +func (v *cheapVindex) String() string { return v.name } +func (*cheapVindex) Cost() int { return 0 } +func (*cheapVindex) IsUnique() bool { return true } +func (*cheapVindex) NeedsVCursor() bool { return false } +func (*cheapVindex) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } +func (*cheapVindex) Map(cursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) { + return nil, nil +} + +func NewCheapVindex(name string, params map[string]string) (Vindex, error) { + return &cheapVindex{name: name, Params: params}, nil +} + +var _ SingleColumn = (*stFU)(nil) + // stFU is a Functional, Unique Vindex. type stFU struct { name string @@ -55,6 +78,25 @@ func NewSTFU(name string, params map[string]string) (Vindex, error) { var _ SingleColumn = (*stFU)(nil) +// stFN is a Functional, NonUnique Vindex. +type stFN struct { + name string + Params map[string]string +} + +func (v *stFN) String() string { return v.name } +func (*stFN) Cost() int { return 1 } +func (*stFN) IsUnique() bool { return false } +func (*stFN) NeedsVCursor() bool { return false } +func (*stFN) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } +func (*stFN) Map(cursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) { return nil, nil } + +func NewSTFN(name string, params map[string]string) (Vindex, error) { + return &stFN{name: name, Params: params}, nil +} + +var _ SingleColumn = (*stFN)(nil) + // stLN is a Lookup, NonUnique Vindex. type stLN struct { name string @@ -64,7 +106,7 @@ type stLN struct { func (v *stLN) String() string { return v.name } func (*stLN) Cost() int { return 0 } func (*stLN) IsUnique() bool { return false } -func (*stLN) NeedsVCursor() bool { return false } +func (*stLN) NeedsVCursor() bool { return true } func (*stLN) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } func (*stLN) Map(cursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) { return nil, nil } func (*stLN) Create(VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } @@ -87,7 +129,7 @@ type stLU struct { func (v *stLU) String() string { return v.name } func (*stLU) Cost() int { return 2 } func (*stLU) IsUnique() bool { return true } -func (*stLU) NeedsVCursor() bool { return false } +func (*stLU) NeedsVCursor() bool { return true } func (*stLU) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } func (*stLU) Map(cursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) { return nil, nil } func (*stLU) Create(VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } @@ -113,7 +155,7 @@ type stLO struct { func (v *stLO) String() string { return v.name } func (*stLO) Cost() int { return 2 } func (*stLO) IsUnique() bool { return true } -func (*stLO) NeedsVCursor() bool { return false } +func (*stLO) NeedsVCursor() bool { return true } func (*stLO) Verify(VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil } func (*stLO) Map(cursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) { return nil, nil } func (*stLO) Create(VCursor, [][]sqltypes.Value, [][]byte, bool) error { return nil } @@ -134,7 +176,9 @@ var _ SingleColumn = (*stLO)(nil) var _ Lookup = (*stLO)(nil) func init() { + Register("cheap", NewCheapVindex) Register("stfu", NewSTFU) + Register("stfn", NewSTFN) Register("stln", NewSTLN) Register("stlu", NewSTLU) Register("stlo", NewSTLO) @@ -776,6 +820,107 @@ func TestVSchemaRoutingRules(t *testing.T) { } } +func TestFindBestColVindex(t *testing.T) { + testSrvVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks1": { + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "stfu": { + Type: "stfu", + }, + "stfn": { + Type: "stfn", + }, + "stlu": { + Type: "stlu", + }, + "stln": { + Type: "stln", + }, + "cheap": { + Type: "cheap", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "stfu", + Columns: []string{"id"}, + }}, + }, + "nogoodvindex": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "stlu", + Columns: []string{"id"}, + }}, + }, + "thirdvindexgood": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "stlu", + Columns: []string{"id"}, + }, { + Name: "stfn", + Columns: []string{"id"}, + }, { + Name: "stfu", + Columns: []string{"id"}, + }}, + }, + "cheapest": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "stfu", + Columns: []string{"id"}, + }, { + Name: "cheap", + Columns: []string{"id"}, + }}, + }, + }, + }, + "unsharded": { + Tables: map[string]*vschemapb.Table{ + "t2": {}, + }, + }, + }, + } + vschema, err := BuildVSchema(testSrvVSchema) + require.NoError(t, err) + + testcases := []struct { + tablename string + vindexname string + err string + }{{ + tablename: "t1", + vindexname: "stfu", + }, { + tablename: "nogoodvindex", + err: "could not find a vindex to compute keyspace id for table nogoodvindex", + }, { + tablename: "thirdvindexgood", + vindexname: "stfu", + }, { + tablename: "cheapest", + vindexname: "cheap", + }, { + tablename: "t2", + err: "table t2 has no vindex", + }} + for _, tcase := range testcases { + table, err := vschema.FindTable("", tcase.tablename) + require.NoError(t, err) + cv, err := FindBestColVindex(table) + if err != nil { + assert.EqualError(t, err, tcase.err, tcase.tablename) + continue + } + assert.NoError(t, err, tcase.tablename) + assert.Equal(t, cv.Name, tcase.vindexname, tcase.tablename) + } +} + func TestFindVindexForSharding(t *testing.T) { ks := &Keyspace{ Name: "sharded", diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 09d9fc29c3d..5e9a9b75eae 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -105,7 +105,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if ct.source.GetExternalMysql() == "" { // tabletPicker - if v, ok := params["cell"]; ok { + if v := params["cell"]; v != "" { cell = v } if v := params["tablet_types"]; v != "" { diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index a9c1678fde3..aff689f5569 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -491,11 +491,9 @@ func (vre *Engine) transitionJournal(key string) { for _, sgtid := range je.journal.ShardGtids { bls := vre.controllers[refid].source bls.Keyspace, bls.Shard = sgtid.Keyspace, sgtid.Shard - query := fmt.Sprintf("insert into _vt.vreplication "+ - "(workflow, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, db_name) "+ - "values (%v, %v, %v, %v, %v, %v, %v, 0, '%v', %v)", - encodeString(params["workflow"]), encodeString(bls.String()), encodeString(sgtid.Gtid), params["max_tps"], params["max_replication_lag"], encodeString(params["tablet_types"]), time.Now().Unix(), binlogplayer.BlpRunning, encodeString(vre.dbName)) - qr, err := vre.executeFetchMaybeCreateTable(dbClient, query, 1) + ig := NewInsertGenerator(binlogplayer.BlpRunning, vre.dbName) + ig.AddRow(params["workflow"], &bls, sgtid.Gtid, params["cell"], params["tablet_types"]) + qr, err := vre.executeFetchMaybeCreateTable(dbClient, ig.String(), 1) if err != nil { log.Errorf("transitionJournal: %v", err) return diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go new file mode 100644 index 00000000000..230639e30c0 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "strings" + "time" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/throttler" +) + +// InsertGenerator generates a vreplication insert statement. +type InsertGenerator struct { + buf *strings.Builder + prefix string + + state string + dbname string + now int64 +} + +// NewInsertGenerator creates a new InsertGenerator. +func NewInsertGenerator(state, dbname string) *InsertGenerator { + buf := &strings.Builder{} + buf.WriteString("insert into _vt.vreplication(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name) values ") + return &InsertGenerator{ + buf: buf, + state: state, + dbname: dbname, + now: time.Now().Unix(), + } +} + +// AddRow adds a row to the insert statement. +func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string) { + fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v)", + ig.prefix, + encodeString(workflow), + encodeString(bls.String()), + encodeString(pos), + throttler.MaxRateModuleDisabled, + throttler.ReplicationLagModuleDisabled, + encodeString(cell), + encodeString(tabletTypes), + ig.now, + ig.state, + encodeString(ig.dbname), + ) + ig.prefix = ", " +} + +// String returns the generated statement. +func (ig *InsertGenerator) String() string { + return ig.buf.String() +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator_test.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator_test.go new file mode 100644 index 00000000000..00287a39714 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator_test.go @@ -0,0 +1,38 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestInsertGenerator(t *testing.T) { + ig := NewInsertGenerator(binlogplayer.BlpStopped, "a") + ig.now = 111 + ig.AddRow("b", &binlogdatapb.BinlogSource{Keyspace: "c"}, "d", "e", "f") + want := `insert into _vt.vreplication(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name) values ` + + `('b', 'keyspace:\"c\" ', 'd', 9223372036854775807, 9223372036854775807, 'e', 'f', 111, 0, 'Stopped', 'a')` + assert.Equal(t, ig.String(), want) + + ig.AddRow("g", &binlogdatapb.BinlogSource{Keyspace: "h"}, "i", "j", "k") + want += `, ('g', 'keyspace:\"h\" ', 'i', 9223372036854775807, 9223372036854775807, 'j', 'k', 111, 0, 'Stopped', 'a')` + assert.Equal(t, ig.String(), want) +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/local_vschema.go b/go/vt/vttablet/tabletserver/vstreamer/local_vschema.go index a7e44288317..21696be3a97 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/local_vschema.go +++ b/go/vt/vttablet/tabletserver/vstreamer/local_vschema.go @@ -36,7 +36,7 @@ func (lvs *localVSchema) FindColVindex(tablename string) (*vindexes.ColumnVindex if err != nil { return nil, err } - return identifyColVindex(table) + return vindexes.FindBestColVindex(table) } func (lvs *localVSchema) FindOrCreateVindex(qualifiedName string) (vindexes.Vindex, error) { @@ -74,27 +74,3 @@ func (lvs *localVSchema) findTable(tablename string) (*vindexes.Table, error) { } return table, nil } - -func identifyColVindex(table *vindexes.Table) (*vindexes.ColumnVindex, error) { - if len(table.ColumnVindexes) == 0 { - return nil, fmt.Errorf("table %s has no vindex", table.Name.String()) - } - var result *vindexes.ColumnVindex - for _, cv := range table.ColumnVindexes { - if cv.Vindex.NeedsVCursor() { - continue - } - if !cv.Vindex.IsUnique() { - // This is currently unreachable because all existing non-unique vindexes - // need vcursor. - continue - } - if result == nil || result.Vindex.Cost() > cv.Vindex.Cost() { - result = cv - } - } - if result == nil { - return nil, fmt.Errorf("could not find a vindex to compute keyspace id for table %v", table.Name.String()) - } - return result, nil -} diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 7a53014f464..f4d901c1421 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -101,7 +101,7 @@ func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow allErrors := &concurrency.AllErrorRecorder{} for _, si := range allshards { if si.MasterAlias == nil { - allErrors.RecordError(fmt.Errorf("shard has no master: %v", si)) + allErrors.RecordError(fmt.Errorf("shard has no master: %v", si.ShardName())) continue } wg.Add(1) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go new file mode 100644 index 00000000000..d2b9d0484a3 --- /dev/null +++ b/go/vt/wrangler/materializer.go @@ -0,0 +1,291 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "fmt" + "strings" + "sync" + "text/template" + + "golang.org/x/net/context" + + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/key" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +type materializer struct { + wr *Wrangler + ms *vtctldatapb.MaterializeSettings + targetVSchema *vindexes.KeyspaceSchema + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo +} + +// Materialize performs the steps needed to materialize a list of tables based on the materialization specs. +func (wr *Wrangler) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error { + if err := wr.validateNewWorkflow(ctx, ms.TargetKeyspace, ms.Workflow); err != nil { + return err + } + mz, err := wr.buildMaterializer(ctx, ms) + if err != nil { + return err + } + if err := mz.deploySchema(ctx); err != nil { + return err + } + inserts, err := mz.generateInserts(ctx) + if err != nil { + return err + } + if err := mz.createStreams(ctx, inserts); err != nil { + return err + } + return mz.startStreams(ctx) +} + +func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.MaterializeSettings) (*materializer, error) { + vschema, err := wr.ts.GetVSchema(ctx, ms.TargetKeyspace) + if err != nil { + return nil, err + } + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace) + if err != nil { + return nil, err + } + if targetVSchema.Keyspace.Sharded { + for _, ts := range ms.TableSettings { + if targetVSchema.Tables[ts.TargetTable] == nil { + return nil, fmt.Errorf("table %s not found in vschema for keyspace %s", ts.TargetTable, ms.TargetKeyspace) + } + } + } + + sourceShards, err := wr.ts.GetServingShards(ctx, ms.SourceKeyspace) + if err != nil { + return nil, err + } + targetShards, err := wr.ts.GetServingShards(ctx, ms.TargetKeyspace) + if err != nil { + return nil, err + } + return &materializer{ + wr: wr, + ms: ms, + targetVSchema: targetVSchema, + sourceShards: sourceShards, + targetShards: targetShards, + }, nil +} + +func (mz *materializer) deploySchema(ctx context.Context) error { + return mz.forAllTargets(func(target *topo.ShardInfo) error { + for _, ts := range mz.ms.TableSettings { + tableSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, []string{ts.TargetTable}, nil, false) + if err != nil { + return err + } + if len(tableSchema.TableDefinitions) != 0 { + // Table already exists. + continue + } + if ts.CreateDdl == "" { + return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) + } + createddl := ts.CreateDdl + if createddl == "copy" { + sourceTableName, err := sqlparser.TableFromStatement(ts.SourceExpression) + if err != nil { + return err + } + if sourceTableName.Name.String() != ts.TargetTable { + return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable) + } + sourceMaster := mz.sourceShards[0].MasterAlias + if sourceMaster == nil { + return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) + } + sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, []string{ts.TargetTable}, nil, false) + if err != nil { + return err + } + if len(sourceSchema.TableDefinitions) == 0 { + return fmt.Errorf("source table %v does not exist", ts.TargetTable) + } + createddl = sourceSchema.TableDefinitions[0].Schema + } + targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) + if err != nil { + return err + } + if _, err := mz.wr.tmc.ExecuteFetchAsDba(ctx, targetTablet.Tablet, false, []byte(createddl), 0, false, true); err != nil { + return err + } + } + return nil + }) +} + +func (mz *materializer) generateInserts(ctx context.Context) (string, error) { + ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, "{{.dbname}}") + + for _, source := range mz.sourceShards { + bls := &binlogdatapb.BinlogSource{ + Keyspace: mz.ms.SourceKeyspace, + Shard: source.ShardName(), + Filter: &binlogdatapb.Filter{}, + } + for _, ts := range mz.ms.TableSettings { + rule := &binlogdatapb.Rule{ + Match: ts.TargetTable, + } + // Validate the query. + stmt, err := sqlparser.Parse(ts.SourceExpression) + if err != nil { + return "", err + } + sel, ok := stmt.(*sqlparser.Select) + if !ok { + return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) + } + if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) + if err != nil { + return "", err + } + mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns)) + for _, col := range cv.Columns { + colName, err := matchColInSelect(col, sel) + if err != nil { + return "", err + } + mappedCols = append(mappedCols, colName) + } + subExprs := make(sqlparser.SelectExprs, 0, len(mappedCols)+2) + for _, mappedCol := range mappedCols { + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol}) + } + vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte(vindexName))}) + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte("'{{.keyrange}}'"))}) + sel.Where = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: &sqlparser.FuncExpr{ + Name: sqlparser.NewColIdent("in_keyrange"), + Exprs: subExprs, + }, + } + rule.Filter = sqlparser.String(sel) + } else { + rule.Filter = ts.SourceExpression + } + bls.Filter.Rules = append(bls.Filter.Rules, rule) + } + ig.AddRow(mz.ms.Workflow, bls, "", mz.ms.Cell, mz.ms.TabletTypes) + } + return ig.String(), nil +} + +func matchColInSelect(col sqlparser.ColIdent, sel *sqlparser.Select) (*sqlparser.ColName, error) { + for _, selExpr := range sel.SelectExprs { + switch selExpr := selExpr.(type) { + case *sqlparser.StarExpr: + return &sqlparser.ColName{Name: col}, nil + case *sqlparser.AliasedExpr: + match := selExpr.As + if match.IsEmpty() { + if colExpr, ok := selExpr.Expr.(*sqlparser.ColName); ok { + match = colExpr.Name + } else { + // Cannot match against a complex expression. + continue + } + } + if match.Equal(col) { + colExpr, ok := selExpr.Expr.(*sqlparser.ColName) + if !ok { + return nil, fmt.Errorf("vindex column cannot be a complex expression: %v", sqlparser.String(selExpr)) + } + return colExpr, nil + } + default: + return nil, fmt.Errorf("unsupported select expression: %v", sqlparser.String(selExpr)) + } + } + return nil, fmt.Errorf("could not find vindex column %v", sqlparser.String(col)) +} + +func (mz *materializer) createStreams(ctx context.Context, inserts string) error { + return mz.forAllTargets(func(target *topo.ShardInfo) error { + targetMaster, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v) failed", target.MasterAlias) + } + buf := &strings.Builder{} + t := template.Must(template.New("").Parse(inserts)) + input := map[string]string{ + "keyrange": key.KeyRangeString(target.KeyRange), + "dbname": targetMaster.DbName(), + } + if err := t.Execute(buf, input); err != nil { + return err + } + if _, err := mz.wr.TabletManagerClient().VReplicationExec(ctx, targetMaster.Tablet, buf.String()); err != nil { + return err + } + return nil + }) +} + +func (mz *materializer) startStreams(ctx context.Context) error { + return mz.forAllTargets(func(target *topo.ShardInfo) error { + targetMaster, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v) failed", target.MasterAlias) + } + query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow=%s", encodeString(targetMaster.DbName()), encodeString(mz.ms.Workflow)) + if _, err := mz.wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetMaster.Tablet, query) + } + return nil + }) +} + +func (mz *materializer) forAllTargets(f func(*topo.ShardInfo) error) error { + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, target := range mz.targetShards { + wg.Add(1) + go func(target *topo.ShardInfo) { + defer wg.Done() + + if err := f(target); err != nil { + allErrors.RecordError(err) + } + }(target) + } + wg.Wait() + return allErrors.AggrError(vterrors.Aggregate) +} diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go new file mode 100644 index 00000000000..96219b6d42f --- /dev/null +++ b/go/vt/wrangler/materializer_env_test.go @@ -0,0 +1,225 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "fmt" + "regexp" + "sync" + "testing" + + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/tmclient" +) + +type testMaterializerEnv struct { + wr *Wrangler + ms *vtctldatapb.MaterializeSettings + sources []string + targets []string + tablets map[int]*topodatapb.Tablet + topoServ *topo.Server + cell string + tmc *testMaterializerTMClient +} + +//---------------------------------------------- +// testMaterializerEnv + +func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { + t.Helper() + env := &testMaterializerEnv{ + ms: ms, + sources: sources, + targets: targets, + tablets: make(map[int]*topodatapb.Tablet), + topoServ: memorytopo.NewServer("cell"), + cell: "cell", + tmc: newTestMaterializerTMClient(), + } + env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + + tabletID := 100 + for _, shard := range sources { + _ = env.addTablet(tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_MASTER) + tabletID += 10 + } + tabletID = 200 + for _, shard := range targets { + _ = env.addTablet(tabletID, env.ms.TargetKeyspace, shard, topodatapb.TabletType_MASTER) + tabletID += 10 + } + + for _, ts := range ms.TableSettings { + tablename := ts.TargetTable + table, err := sqlparser.TableFromStatement(ts.SourceExpression) + if err == nil { + tablename = table.Name.String() + } + env.tmc.schema[ms.SourceKeyspace+"."+tablename] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Schema: fmt.Sprintf("%s_schema", tablename), + }}, + } + env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Schema: fmt.Sprintf("%s_schema", ts.TargetTable), + }}, + } + } + return env +} + +func (env *testMaterializerEnv) expectValidation() { + for _, tablet := range env.tablets { + tabletID := int(tablet.Alias.Uid) + if tabletID < 200 { + continue + } + // wr.validateNewWorkflow + env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.ms.TargetKeyspace, env.ms.Workflow), &sqltypes.Result{}) + } +} + +func (env *testMaterializerEnv) close() { + for _, t := range env.tablets { + env.deleteTablet(t) + } +} + +func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tabletType topodatapb.TabletType) *topodatapb.Tablet { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + env.tablets[id] = tablet + if err := env.wr.InitTablet(context.Background(), tablet, false /* allowMasterOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + panic(err) + } + if tabletType == topodatapb.TabletType_MASTER { + _, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { + si.MasterAlias = tablet.Alias + return nil + }) + if err != nil { + panic(err) + } + } + return tablet +} + +func (env *testMaterializerEnv) deleteTablet(tablet *topodatapb.Tablet) { + env.topoServ.DeleteTablet(context.Background(), tablet.Alias) + delete(env.tablets, int(tablet.Alias.Uid)) +} + +//---------------------------------------------- +// testMaterializerTMClient + +type testMaterializerTMClient struct { + tmclient.TabletManagerClient + schema map[string]*tabletmanagerdatapb.SchemaDefinition + + mu sync.Mutex + vrQueries map[int][]*queryResult +} + +func newTestMaterializerTMClient() *testMaterializerTMClient { + return &testMaterializerTMClient{ + schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), + vrQueries: make(map[int][]*queryResult), + } +} + +func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + key := tablet.Keyspace + "." + tables[0] + if tmc.schema[key] == nil { + return &tabletmanagerdatapb.SchemaDefinition{}, nil + } + return tmc.schema[key], nil +} + +func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, result *sqltypes.Result) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + tmc.vrQueries[tabletID] = append(tmc.vrQueries[tabletID], &queryResult{ + query: query, + result: sqltypes.ResultToProto3(result), + }) +} + +func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + qrs := tmc.vrQueries[int(tablet.Alias.Uid)] + if len(qrs) == 0 { + return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + } + matched := false + if qrs[0].query[0] == '/' { + matched = regexp.MustCompile(qrs[0].query[1:]).MatchString(query) + } else { + matched = query == qrs[0].query + } + if !matched { + return nil, fmt.Errorf("tablet %v: unexpected query %s, want: %s", tablet, query, qrs[0].query) + } + tmc.vrQueries[int(tablet.Alias.Uid)] = qrs[1:] + return qrs[0].result, nil +} + +func (tmc *testMaterializerTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, query []byte, maxRows int, disableBinlogs, reloadSchema bool) (*querypb.QueryResult, error) { + // Reuse VReplicationExec + return tmc.VReplicationExec(ctx, tablet, string(query)) +} + +func (tmc *testMaterializerTMClient) verifyQueries(t *testing.T) { + t.Helper() + + tmc.mu.Lock() + defer tmc.mu.Unlock() + + for tabletID, qrs := range tmc.vrQueries { + if len(qrs) != 0 { + var list []string + for _, qr := range qrs { + list = append(list, qr.query) + } + t.Errorf("tablet %v: has unreturned results: %v", tabletID, list) + } + } +} diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go new file mode 100644 index 00000000000..f8432e36e5c --- /dev/null +++ b/go/vt/wrangler/materializer_test.go @@ -0,0 +1,757 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'" + +func TestMaterializerOneToOne(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t1", + CreateDdl: "t1ddl", + }, { + TargetTable: "t2", + SourceExpression: "select * from t3", + CreateDdl: "t2ddl", + }}, + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + env.expectValidation() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('workflow', 'keyspace:\\"sourceks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) + + err := env.wr.Materialize(context.Background(), ms) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +func TestMaterializerManyToOne(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t1", + CreateDdl: "t1ddl", + }, { + TargetTable: "t2", + SourceExpression: "select * from t3", + CreateDdl: "t2ddl", + }}, + } + env := newTestMaterializerEnv(t, ms, []string{"-80", "80-"}, []string{"0"}) + defer env.close() + env.expectValidation() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('workflow', 'keyspace:\\"sourceks\\" shard:\\"-80\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+ + `, `+ + `\('workflow', 'keyspace:\\"sourceks\\" shard:\\"80-\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) + + err := env.wr.Materialize(context.Background(), ms) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +func TestMaterializerOneToMany(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t1", + CreateDdl: "t1ddl", + }}, + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) + defer env.close() + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + + if err := env.topoServ.SaveVSchema(context.Background(), "targetks", vs); err != nil { + t.Fatal(err) + } + env.expectValidation() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `.*shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) + + err := env.wr.Materialize(context.Background(), ms) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +func TestMaterializerCopySchema(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t1", + CreateDdl: "copy", + }, { + TargetTable: "t2", + SourceExpression: "select * from t3", + CreateDdl: "t2ddl", + }}, + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + env.expectValidation() + + delete(env.tmc.schema, "targetks.t1") + + env.tmc.expectVRQuery(200, `t1_schema`, &sqltypes.Result{}) + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('workflow', 'keyspace:\\"sourceks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) + + err := env.wr.Materialize(context.Background(), ms) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +func TestMaterializerExplicitColumns(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select c1, c1+c2, c2 from t1", + CreateDdl: "t1ddl", + }}, + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) + defer env.close() + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "region": { + Type: "region_experimental", + Params: map[string]string{ + "region_bytes": "1", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Columns: []string{"c1", "c2"}, + Name: "region", + }}, + }, + }, + } + + if err := env.topoServ.SaveVSchema(context.Background(), "targetks", vs); err != nil { + t.Fatal(err) + } + env.expectValidation() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `.*shard:\\"0\\" filter: