Skip to content

Commit

Permalink
feat: Allow setting of default schema version (#1888)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #1884

## Description

Allows setting of default schema version, allowing rapid switching
between (application) api/database versions. It also allows them to
define and apply schema updates eagerly, and then make the switch at a
later date.
  • Loading branch information
AndrewSisley authored Sep 25, 2023
1 parent 667023e commit db1f41c
Show file tree
Hide file tree
Showing 15 changed files with 656 additions and 40 deletions.
5 changes: 4 additions & 1 deletion api/http/handlerfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ func patchSchemaHandler(rw http.ResponseWriter, req *http.Request) {
return
}

err = db.PatchSchema(req.Context(), string(patch))
// Hardcode setDefault to true here, as that preserves the existing behaviour.
// This function will be ripped out very shortly and I don't think it is worth
// spending time/thought here. The new http api handles this correctly.
err = db.PatchSchema(req.Context(), string(patch), true)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
Expand Down
14 changes: 12 additions & 2 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ type Store interface {
AddSchema(context.Context, string) ([]CollectionDescription, error)

// PatchSchema takes the given JSON patch string and applies it to the set of CollectionDescriptions
// present in the database.
// present in the database. If true is provided, the new schema versions will be made default, otherwise
// [SetDefaultSchemaVersion] should be called to set them so.
//
// It will also update the GQL types used by the query system. It will error and not apply any of the
// requested, valid updates should the net result of the patch result in an invalid state. The
Expand All @@ -109,7 +110,16 @@ type Store interface {
//
// Field [FieldKind] values may be provided in either their raw integer form, or as string as per
// [FieldKindStringToEnumMapping].
PatchSchema(context.Context, string) error
PatchSchema(context.Context, string, bool) error

// SetDefaultSchemaVersion sets the default schema version to the ID provided. It will be applied to all
// collections using the schema.
//
// This will affect all operations interacting with the schema where a schema version is not explicitly
// provided. This includes GQL queries and Collection operations.
//
// It will return an error if the provided schema version ID does not exist.
SetDefaultSchemaVersion(context.Context, string) error

// SetMigration sets the migration for the given source-destination schema version IDs. Is equivilent to
// calling `LensRegistry().SetMigration(ctx, cfg)`.
Expand Down
64 changes: 54 additions & 10 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 49 additions & 12 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (db *db) updateCollection(
existingDescriptionsByName map[string]client.CollectionDescription,
proposedDescriptionsByName map[string]client.CollectionDescription,
desc client.CollectionDescription,
setAsDefaultVersion bool,
) (client.Collection, error) {
hasChanged, err := db.validateUpdateCollection(ctx, txn, existingDescriptionsByName, proposedDescriptionsByName, desc)
if err != nil {
Expand Down Expand Up @@ -300,24 +301,19 @@ func (db *db) updateCollection(
return nil, err
}

collectionSchemaKey := core.NewCollectionSchemaKey(desc.Schema.SchemaID)
err = txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionID))
if err != nil {
return nil, err
}

collectionKey := core.NewCollectionKey(desc.Name)
err = txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionID))
if err != nil {
return nil, err
}

schemaVersionHistoryKey := core.NewSchemaHistoryKey(desc.Schema.SchemaID, previousSchemaVersionID)
err = txn.Systemstore().Put(ctx, schemaVersionHistoryKey.ToDS(), []byte(schemaVersionID))
if err != nil {
return nil, err
}

if setAsDefaultVersion {
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, desc.Schema.SchemaID, schemaVersionID)
if err != nil {
return nil, err
}
}

return db.getCollectionByName(ctx, txn, desc.Name)
}

Expand Down Expand Up @@ -591,6 +587,47 @@ func validateUpdateCollectionIndexes(
return false, nil
}

func (db *db) setDefaultSchemaVersion(
ctx context.Context,
txn datastore.Txn,
schemaVersionID string,
) error {
col, err := db.getCollectionByVersionID(ctx, txn, schemaVersionID)
if err != nil {
return err
}

desc := col.Description()
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, desc.Schema.SchemaID, schemaVersionID)
if err != nil {
return err
}

cols, err := db.getCollectionDescriptions(ctx, txn)
if err != nil {
return err
}

return db.parser.SetSchema(ctx, txn, cols)
}

func (db *db) setDefaultSchemaVersionExplicit(
ctx context.Context,
txn datastore.Txn,
collectionName string,
schemaID string,
schemaVersionID string,
) error {
collectionSchemaKey := core.NewCollectionSchemaKey(schemaID)
err := txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionID))
if err != nil {
return err
}

collectionKey := core.NewCollectionKey(collectionName)
return txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionID))
}

// getCollectionByVersionId returns the [*collection] at the given [schemaVersionId] version.
//
// Will return an error if the given key is empty, or not found.
Expand Down
5 changes: 3 additions & 2 deletions db/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (db *db) getCollectionDescriptions(
// The collections (including the schema version ID) will only be updated if any changes have actually
// been made, if the net result of the patch matches the current persisted description then no changes
// will be applied.
func (db *db) patchSchema(ctx context.Context, txn datastore.Txn, patchString string) error {
func (db *db) patchSchema(ctx context.Context, txn datastore.Txn, patchString string, setAsDefaultVersion bool) error {
patch, err := jsonpatch.DecodePatch([]byte(patchString))
if err != nil {
return err
Expand Down Expand Up @@ -144,10 +144,11 @@ func (db *db) patchSchema(ctx context.Context, txn datastore.Txn, patchString st
}

for i, desc := range newDescriptions {
col, err := db.updateCollection(ctx, txn, collectionsByName, newDescriptionsByName, desc)
col, err := db.updateCollection(ctx, txn, collectionsByName, newDescriptionsByName, desc, setAsDefaultVersion)
if err != nil {
return err
}

newDescriptions[i] = col.Description()
}

Expand Down
27 changes: 23 additions & 4 deletions db/txn_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ func (db *explicitTxnDB) AddSchema(ctx context.Context, schemaString string) ([]
// The collections (including the schema version ID) will only be updated if any changes have actually
// been made, if the net result of the patch matches the current persisted description then no changes
// will be applied.
func (db *implicitTxnDB) PatchSchema(ctx context.Context, patchString string) error {
func (db *implicitTxnDB) PatchSchema(ctx context.Context, patchString string, setAsDefaultVersion bool) error {
txn, err := db.NewTxn(ctx, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = db.patchSchema(ctx, txn, patchString)
err = db.patchSchema(ctx, txn, patchString, setAsDefaultVersion)
if err != nil {
return err
}
Expand All @@ -276,8 +276,27 @@ func (db *implicitTxnDB) PatchSchema(ctx context.Context, patchString string) er
// The collections (including the schema version ID) will only be updated if any changes have actually
// been made, if the net result of the patch matches the current persisted description then no changes
// will be applied.
func (db *explicitTxnDB) PatchSchema(ctx context.Context, patchString string) error {
return db.patchSchema(ctx, db.txn, patchString)
func (db *explicitTxnDB) PatchSchema(ctx context.Context, patchString string, setAsDefaultVersion bool) error {
return db.patchSchema(ctx, db.txn, patchString, setAsDefaultVersion)
}

func (db *implicitTxnDB) SetDefaultSchemaVersion(ctx context.Context, schemaVersionID string) error {
txn, err := db.NewTxn(ctx, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = db.setDefaultSchemaVersion(ctx, txn, schemaVersionID)
if err != nil {
return err
}

return txn.Commit(ctx)
}

func (db *explicitTxnDB) SetDefaultSchemaVersion(ctx context.Context, schemaVersionID string) error {
return db.setDefaultSchemaVersion(ctx, db.txn, schemaVersionID)
}

func (db *implicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig) error {
Expand Down
25 changes: 23 additions & 2 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,31 @@ func (c *Client) AddSchema(ctx context.Context, schema string) ([]client.Collect
return cols, nil
}

func (c *Client) PatchSchema(ctx context.Context, patch string) error {
type patchSchemaRequest struct {
Patch string
SetAsDefaultVersion bool
}

func (c *Client) PatchSchema(ctx context.Context, patch string, setAsDefaultVersion bool) error {
methodURL := c.http.baseURL.JoinPath("schema")

req, err := http.NewRequestWithContext(ctx, http.MethodPatch, methodURL.String(), strings.NewReader(patch))
body, err := json.Marshal(patchSchemaRequest{patch, setAsDefaultVersion})
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPatch, methodURL.String(), bytes.NewBuffer(body))
if err != nil {
return err
}
_, err = c.http.request(req)
return err
}

func (c *Client) SetDefaultSchemaVersion(ctx context.Context, schemaVersionID string) error {
methodURL := c.http.baseURL.JoinPath("schema", "default")

req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), strings.NewReader(schemaVersionID))
if err != nil {
return err
}
Expand Down
22 changes: 20 additions & 2 deletions http/handler_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,30 @@ func (s *storeHandler) AddSchema(rw http.ResponseWriter, req *http.Request) {
func (s *storeHandler) PatchSchema(rw http.ResponseWriter, req *http.Request) {
store := req.Context().Value(storeContextKey).(client.Store)

patch, err := io.ReadAll(req.Body)
var message patchSchemaRequest
err := requestJSON(req, &message)
if err != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{err})
return
}
err = store.PatchSchema(req.Context(), string(patch))

err = store.PatchSchema(req.Context(), message.Patch, message.SetAsDefaultVersion)
if err != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{err})
return
}
rw.WriteHeader(http.StatusOK)
}

func (s *storeHandler) SetDefaultSchemaVersion(rw http.ResponseWriter, req *http.Request) {
store := req.Context().Value(storeContextKey).(client.Store)

schemaVersionID, err := io.ReadAll(req.Body)
if err != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{err})
return
}
err = store.SetDefaultSchemaVersion(req.Context(), string(schemaVersionID))
if err != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{err})
return
Expand Down
1 change: 1 addition & 0 deletions http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewServer(db client.DB) *Server {
api.Route("/schema", func(schema chi.Router) {
schema.Post("/", store_handler.AddSchema)
schema.Patch("/", store_handler.PatchSchema)
schema.Post("/default", store_handler.SetDefaultSchemaVersion)
})
api.Route("/collections", func(collections chi.Router) {
collections.Get("/", store_handler.GetCollection)
Expand Down
8 changes: 6 additions & 2 deletions http/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ func (w *Wrapper) AddSchema(ctx context.Context, schema string) ([]client.Collec
return w.client.AddSchema(ctx, schema)
}

func (w *Wrapper) PatchSchema(ctx context.Context, patch string) error {
return w.client.PatchSchema(ctx, patch)
func (w *Wrapper) PatchSchema(ctx context.Context, patch string, setAsDefaultVersion bool) error {
return w.client.PatchSchema(ctx, patch, setAsDefaultVersion)
}

func (w *Wrapper) SetDefaultSchemaVersion(ctx context.Context, schemaVersionID string) error {
return w.client.SetDefaultSchemaVersion(ctx, schemaVersionID)
}

func (w *Wrapper) SetMigration(ctx context.Context, config client.LensConfig) error {
Expand Down
Loading

0 comments on commit db1f41c

Please sign in to comment.