Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize schema names in foreign key constraints #8461

Merged
merged 10 commits into from
Oct 17, 2024
6 changes: 3 additions & 3 deletions go/cmd/dolt/commands/cvcmds/verify_constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ func (cmd VerifyConstraintsCmd) Exec(ctx context.Context, commandStr string, arg
}

for _, tableName := range tablesWithViolations.AsSortedSlice() {
tbl, ok, err := endRoot.GetTable(ctx, doltdb.TableName{Name: tableName})
tbl, ok, err := endRoot.GetTable(ctx, tableName)
if err != nil {
return commands.HandleVErrAndExitCode(errhand.BuildDError("Error loading table.").AddCause(err).Build(), nil)
}
if !ok {
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to load table '%s'.", tableName).Build(), nil)
}
cli.Println("")
cli.Println(doltdb.DoltConstViolTablePrefix + tableName)
dErr := printViolationsForTable(ctx, dbName, tableName, tbl, eng)
cli.Println(doltdb.DoltConstViolTablePrefix + tableName.Name)
dErr := printViolationsForTable(ctx, dbName, tableName.Name, tbl, eng)
if dErr != nil {
return commands.HandleVErrAndExitCode(dErr, nil)
}
Expand Down
6 changes: 2 additions & 4 deletions go/libraries/doltcore/diff/table_deltas.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ func GetTableDeltas(ctx context.Context, fromRoot, toRoot doltdb.RootValue) (del
func getFkParentSchs(ctx context.Context, root doltdb.RootValue, fks ...doltdb.ForeignKey) (map[doltdb.TableName]schema.Schema, error) {
schs := make(map[doltdb.TableName]schema.Schema)
for _, toFk := range fks {
// TODO: schema
toRefTable, _, ok, err := doltdb.GetTableInsensitive(ctx, root, doltdb.TableName{Name: toFk.ReferencedTableName})
toRefTable, _, ok, err := doltdb.GetTableInsensitive(ctx, root, toFk.ReferencedTableName)
if err != nil {
return nil, err
}
Expand All @@ -228,8 +227,7 @@ func getFkParentSchs(ctx context.Context, root doltdb.RootValue, fks ...doltdb.F
if err != nil {
return nil, err
}
// TODO: schema name
schs[doltdb.TableName{Name: toFk.ReferencedTableName}] = toRefSch
schs[toFk.ReferencedTableName] = toRefSch
}
return schs, nil
}
Expand Down
42 changes: 22 additions & 20 deletions go/libraries/doltcore/doltdb/foreign_key_coll.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ const (
// ForeignKey is the complete, internal representation of a Foreign Key.
type ForeignKey struct {
Name string `noms:"name" json:"name"`
TableName string `noms:"tbl_name" json:"tbl_name"`
TableName TableName `noms:"tbl_name" json:"tbl_name"`
TableIndex string `noms:"tbl_index" json:"tbl_index"`
TableColumns []uint64 `noms:"tbl_cols" json:"tbl_cols"`
ReferencedTableName string `noms:"ref_tbl_name" json:"ref_tbl_name"`
ReferencedTableName TableName `noms:"ref_tbl_name" json:"ref_tbl_name"`
ReferencedTableIndex string `noms:"ref_tbl_index" json:"ref_tbl_index"`
ReferencedTableColumns []uint64 `noms:"ref_tbl_cols" json:"ref_tbl_cols"`
OnUpdate ForeignKeyReferentialAction `noms:"on_update" json:"on_update"`
Expand Down Expand Up @@ -167,7 +167,7 @@ func (fk ForeignKey) Equals(other ForeignKey, fkSchemasByName, otherSchemasByNam
}
for i, tag := range resolvedFK.TableColumns {
unresolvedColName := unresolvedFK.UnresolvedFKDetails.TableColumns[i]
resolvedSch, ok := resolvedSchemasByName[TableName{Name: resolvedFK.TableName}]
resolvedSch, ok := resolvedSchemasByName[resolvedFK.TableName]
if !ok {
return false
}
Expand All @@ -186,7 +186,7 @@ func (fk ForeignKey) Equals(other ForeignKey, fkSchemasByName, otherSchemasByNam
}
for i, tag := range resolvedFK.ReferencedTableColumns {
unresolvedColName := unresolvedFK.UnresolvedFKDetails.ReferencedTableColumns[i]
resolvedSch, ok := resolvedSchemasByName[TableName{Name: unresolvedFK.ReferencedTableName}]
resolvedSch, ok := resolvedSchemasByName[unresolvedFK.ReferencedTableName]
if !ok {
return false
}
Expand Down Expand Up @@ -246,6 +246,8 @@ func (fk ForeignKey) HashOf() (hash.Hash, error) {
_, err = bb.Write(t)
case uint64:
err = binary.Write(&bb, binary.LittleEndian, t)
case TableName:
_, err = bb.Write([]byte(t.String()))
default:
return hash.Hash{}, fmt.Errorf("unsupported type %T", t)
}
Expand Down Expand Up @@ -278,7 +280,7 @@ func CombinedHash(fks []ForeignKey) (hash.Hash, error) {

// IsSelfReferential returns whether the table declaring the foreign key is also referenced by the foreign key.
func (fk ForeignKey) IsSelfReferential() bool {
return strings.EqualFold(fk.TableName, fk.ReferencedTableName)
return fk.TableName.EqualFold(fk.ReferencedTableName)
}

// IsResolved returns whether the foreign key has been resolved.
Expand Down Expand Up @@ -543,13 +545,13 @@ OuterLoopResolved:
len(fk.ReferencedTableColumns) != len(existingFk.UnresolvedFKDetails.ReferencedTableColumns) {
continue
}
// TODO: schema name
tblSch, ok := allSchemas[TableName{Name: existingFk.TableName}]

tblSch, ok := allSchemas[existingFk.TableName]
if !ok {
continue
}
// TODO: schema name
refTblSch, ok := allSchemas[TableName{Name: existingFk.ReferencedTableName}]

refTblSch, ok := allSchemas[existingFk.ReferencedTableName]
if !ok {
continue
}
Expand Down Expand Up @@ -590,10 +592,10 @@ func (fkc *ForeignKeyCollection) Iter(cb func(fk ForeignKey) (stop bool, err err
// it will be present in both declaresFk and referencedByFk. Each array is sorted by name ascending.
func (fkc *ForeignKeyCollection) KeysForTable(tableName TableName) (declaredFk, referencedByFk []ForeignKey) {
for _, foreignKey := range fkc.foreignKeys {
if strings.EqualFold(foreignKey.TableName, tableName.Name) {
if foreignKey.TableName.EqualFold(tableName) {
declaredFk = append(declaredFk, foreignKey)
}
if strings.EqualFold(foreignKey.ReferencedTableName, tableName.Name) {
if foreignKey.ReferencedTableName.EqualFold(tableName) {
referencedByFk = append(referencedByFk, foreignKey)
}
}
Expand Down Expand Up @@ -643,9 +645,9 @@ func (fkc *ForeignKeyCollection) RemoveKeyByName(foreignKeyName string) bool {
func (fkc *ForeignKeyCollection) RemoveTables(ctx context.Context, tables ...TableName) error {
outgoing := NewTableNameSet(tables)
for _, fk := range fkc.foreignKeys {
// TODO: schema names
dropChild := outgoing.Contains(TableName{Name: fk.TableName})
dropParent := outgoing.Contains(TableName{Name: fk.ReferencedTableName})

dropChild := outgoing.Contains(fk.TableName)
dropParent := outgoing.Contains(fk.ReferencedTableName)
if dropParent && !dropChild {
return fmt.Errorf("unable to remove `%s` since it is referenced from table `%s`", fk.ReferencedTableName, fk.TableName)
}
Expand All @@ -666,8 +668,8 @@ func (fkc *ForeignKeyCollection) RemoveTables(ctx context.Context, tables ...Tab
func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, root RootValue, tables ...TableName) error {
outgoing := NewTableNameSet(tables)
for _, fk := range fkc.foreignKeys {
dropChild := outgoing.Contains(TableName{Name: fk.TableName})
dropParent := outgoing.Contains(TableName{Name: fk.ReferencedTableName})
dropChild := outgoing.Contains(fk.TableName)
dropParent := outgoing.Contains(fk.ReferencedTableName)
if dropParent && !dropChild {
if !fk.IsResolved() {
continue
Expand All @@ -682,7 +684,7 @@ func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, r
fk.UnresolvedFKDetails.TableColumns = make([]string, len(fk.TableColumns))
fk.UnresolvedFKDetails.ReferencedTableColumns = make([]string, len(fk.ReferencedTableColumns))

tbl, ok, err := root.GetTable(ctx, TableName{Name: fk.TableName})
tbl, ok, err := root.GetTable(ctx, fk.TableName)
if err != nil {
return err
}
Expand All @@ -703,7 +705,7 @@ func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, r
fk.UnresolvedFKDetails.TableColumns[i] = col.Name
}

refTbl, ok, err := root.GetTable(ctx, TableName{Name: fk.ReferencedTableName})
refTbl, ok, err := root.GetTable(ctx, fk.ReferencedTableName)
if err != nil {
return err
}
Expand Down Expand Up @@ -747,8 +749,8 @@ func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, r
}

// Tables returns the set of all tables that either declare a foreign key or are referenced by a foreign key.
func (fkc *ForeignKeyCollection) Tables() map[string]struct{} {
tables := make(map[string]struct{})
func (fkc *ForeignKeyCollection) Tables() map[TableName]struct{} {
tables := make(map[TableName]struct{})
for _, fk := range fkc.foreignKeys {
tables[fk.TableName] = struct{}{}
tables[fk.ReferencedTableName] = struct{}{}
Expand Down
18 changes: 14 additions & 4 deletions go/libraries/doltcore/doltdb/foreign_key_serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,22 @@ func deserializeFlatbufferForeignKeys(msg types.SerialMessage) (*ForeignKeyColle
}
}

tableName, ok := decodeTableNameFromSerialization(string(fk.ChildTableName()))
if !ok {
return nil, fmt.Errorf("could not decode table name: %s", string(fk.ChildTableName()))
}

parentTableName, ok := decodeTableNameFromSerialization(string(fk.ParentTableName()))
if !ok {
return nil, fmt.Errorf("could not decode table name: %s", string(fk.ParentTableName()))
}

err := collection.AddKeys(ForeignKey{
Name: string(fk.Name()),
TableName: string(fk.ChildTableName()),
TableName: tableName,
TableIndex: string(fk.ChildTableIndex()),
TableColumns: childCols,
ReferencedTableName: string(fk.ParentTableName()),
ReferencedTableName: parentTableName,
ReferencedTableIndex: string(fk.ParentTableIndex()),
ReferencedTableColumns: parentCols,
OnUpdate: ForeignKeyReferentialAction(fk.OnUpdate()),
Expand Down Expand Up @@ -181,9 +191,9 @@ func serializeFlatbufferForeignKeys(fkc *ForeignKeyCollection) types.SerialMessa
}
parentCols = serializeUint64Vector(b, fk.ReferencedTableColumns)
childCols = serializeUint64Vector(b, fk.TableColumns)
parentTable = b.CreateString(fk.ReferencedTableName)
parentTable = b.CreateString(encodeTableNameForSerialization(fk.ReferencedTableName))
parentIndex = b.CreateString(fk.ReferencedTableIndex)
childTable = b.CreateString(fk.TableName)
childTable = b.CreateString(encodeTableNameForSerialization(fk.TableName))
childIndex = b.CreateString(fk.TableIndex)
foreignKeyName = b.CreateString(fk.Name)

Expand Down
Loading
Loading