Skip to content

Commit

Permalink
feat: Remove CollectionDescription.Schema (sourcenetwork#1965)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#1958

## Description

Removes CollectionDescription.Schema.

Also splits the storage of schema out from within collection.

The storage of schema has been broken out to a new sub-package of db, at
the moment it is a very simple file, but collection will be moved there
in sourcenetwork#1964. I was planning
on doing that in this PR (in part, to provide context for reviewers, as
atm it is basically a single-file package), but it proved to be
non-trivial due to some existing messiness in that space and was broken
out to two more tasks.

I also wish for stuff in that directory to eventually follow a
repository-like pattern, where stuff is cached (within a context/txn's
context) instead of fetching from store on each call.

Moving this stuff out to a new directory instead of preserving it in the
(already very large) db directory should make both db and the new
sub-package a fair bit more cohesive and easier to read.
  • Loading branch information
AndrewSisley authored Oct 17, 2023
1 parent 7e5f33d commit e95510f
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 309 deletions.
6 changes: 2 additions & 4 deletions client/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ type CollectionDescription struct {
// It is immutable.
ID uint32

// Schema contains the data type information that this Collection uses.
//
// This property is deprecated and should not be used.
Schema SchemaDescription
// The ID of the schema version that this collection is at.
SchemaVersionID string

// Indexes contains the secondary indexes that this Collection has.
Indexes []IndexDescription
Expand Down
37 changes: 37 additions & 0 deletions core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
COLLECTION_SCHEMA_VERSION_HISTORY = "/collection/version/h"
COLLECTION_INDEX = "/collection/index"
SCHEMA_MIGRATION = "/schema/migration"
SCHEMA_VERSION = "/schema/version"
SEQ = "/seq"
PRIMARY_KEY = "/pk"
DATASTORE_DOC_VERSION_FIELD_ID = "v"
Expand Down Expand Up @@ -132,6 +133,15 @@ type CollectionIndexKey struct {

var _ Key = (*CollectionIndexKey)(nil)

// SchemaVersionKey points to the json serialized schema at the specified version.
//
// It's corresponding value is immutable.
type SchemaVersionKey struct {
SchemaVersionID string
}

var _ Key = (*SchemaVersionKey)(nil)

// SchemaHistoryKey holds the pathway through the schema version history for
// any given schema.
//
Expand Down Expand Up @@ -257,6 +267,11 @@ func NewCollectionSchemaVersionKey(schemaVersionId string) CollectionSchemaVersi
return CollectionSchemaVersionKey{SchemaVersionId: schemaVersionId}
}

func NewCollectionSchemaVersionKeyFromString(key string) CollectionSchemaVersionKey {
elements := strings.Split(key, "/")
return CollectionSchemaVersionKey{SchemaVersionId: elements[len(elements)-1]}
}

// NewCollectionIndexKey creates a new CollectionIndexKey from a collection name and index name.
func NewCollectionIndexKey(colID, indexName string) CollectionIndexKey {
return CollectionIndexKey{CollectionName: colID, IndexName: indexName}
Expand Down Expand Up @@ -307,6 +322,10 @@ func (k CollectionIndexKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func NewSchemaVersionKey(schemaVersionID string) SchemaVersionKey {
return SchemaVersionKey{SchemaVersionID: schemaVersionID}
}

func NewSchemaHistoryKey(schemaId string, previousSchemaVersionID string) SchemaHistoryKey {
return SchemaHistoryKey{
SchemaID: schemaId,
Expand Down Expand Up @@ -625,6 +644,24 @@ func (k CollectionSchemaVersionKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k SchemaVersionKey) ToString() string {
result := SCHEMA_VERSION

if k.SchemaVersionID != "" {
result = result + "/" + k.SchemaVersionID
}

return result
}

func (k SchemaVersionKey) Bytes() []byte {
return []byte(k.ToString())
}

func (k SchemaVersionKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k SchemaHistoryKey) ToString() string {
result := COLLECTION_SCHEMA_VERSION_HISTORY

Expand Down
97 changes: 30 additions & 67 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/client/request"
"github.com/sourcenetwork/defradb/core"
ccid "github.com/sourcenetwork/defradb/core/cid"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/db/description"
"github.com/sourcenetwork/defradb/db/fetcher"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/events"
"github.com/sourcenetwork/defradb/lens"
"github.com/sourcenetwork/defradb/logging"
"github.com/sourcenetwork/defradb/merkle/crdt"
)

Expand Down Expand Up @@ -119,67 +118,41 @@ func (db *db) createCollection(
}
desc.ID = uint32(colID)

for i := range schema.Fields {
schema.Fields[i].ID = client.FieldID(i)
}

col, err := db.newCollection(desc, schema)
if err != nil {
return nil, err
}

// Local elements such as secondary indexes should be excluded
// from the (global) schemaId.
schemaBuf, err := json.Marshal(schema)
schema, err = description.CreateSchemaVersion(ctx, txn, schema)
if err != nil {
return nil, err
}

// add a reference to this DB by desc hash
cid, err := ccid.NewSHA256CidV1(schemaBuf)
if err != nil {
return nil, err
}
schemaID := cid.String()

// For new schemas the initial version id will match the schema id
schemaVersionID := schemaID

schema.VersionID = schemaVersionID
schema.SchemaID = schemaID
desc.Schema = schema
desc.SchemaVersionID = schema.VersionID

// buffer must include all the ids, as it is saved and loaded from the store later.
buf, err := json.Marshal(desc)
if err != nil {
return nil, err
}

collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionID)
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schema.VersionID)
// Whilst the schemaVersionKey is global, the data persisted at the key's location
// is local to the node (the global only elements are not useful beyond key generation).
err = txn.Systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf)
if err != nil {
return nil, err
}

collectionSchemaKey := core.NewCollectionSchemaKey(schemaID)
err = txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionID))
collectionSchemaKey := core.NewCollectionSchemaKey(schema.SchemaID)
err = txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schema.VersionID))
if err != nil {
return nil, err
}

err = txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionID))
err = txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schema.VersionID))
if err != nil {
return nil, err
}

log.Debug(
ctx,
"Created collection",
logging.NewKV("Name", col.Name()),
logging.NewKV("SchemaID", col.SchemaID()),
)
col, err := db.newCollection(desc, schema)
if err != nil {
return nil, err
}

for _, index := range desc.Indexes {
if _, err := col.createIndex(ctx, txn, index); err != nil {
Expand All @@ -203,12 +176,9 @@ func (db *db) updateSchema(
txn datastore.Txn,
existingSchemaByName map[string]client.SchemaDescription,
proposedDescriptionsByName map[string]client.SchemaDescription,
def client.CollectionDefinition,
schema client.SchemaDescription,
setAsDefaultVersion bool,
) (client.Collection, error) {
schema := def.Schema
desc := def.Description

hasChanged, err := db.validateUpdateSchema(
ctx,
txn,
Expand All @@ -221,7 +191,7 @@ func (db *db) updateSchema(
}

if !hasChanged {
return db.getCollectionByName(ctx, txn, desc.Name)
return db.getCollectionByName(ctx, txn, schema.Name)
}

for _, field := range schema.Fields {
Expand All @@ -239,56 +209,40 @@ func (db *db) updateSchema(
}

for i, field := range schema.Fields {
if field.ID == client.FieldID(0) {
// This is not wonderful and will probably break when we add the ability
// to delete fields, however it is good enough for now and matches the
// create behaviour.
field.ID = client.FieldID(i)
schema.Fields[i] = field
}

if field.Typ == client.NONE_CRDT {
// If no CRDT Type has been provided, default to LWW_REGISTER.
field.Typ = client.LWW_REGISTER
schema.Fields[i] = field
}
}

globalSchemaBuf, err := json.Marshal(schema)
schema, err = description.CreateSchemaVersion(ctx, txn, schema)
if err != nil {
return nil, err
}

cid, err := ccid.NewSHA256CidV1(globalSchemaBuf)
col, err := db.getCollectionByName(ctx, txn, schema.Name)
if err != nil {
return nil, err
}
previousSchemaVersionID := schema.VersionID
schemaVersionID := cid.String()
schema.VersionID = schemaVersionID
desc.Schema = schema
desc := col.Description()
desc.SchemaVersionID = schema.VersionID

buf, err := json.Marshal(desc)
if err != nil {
return nil, err
}

collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionID)
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schema.VersionID)
// Whilst the schemaVersionKey is global, the data persisted at the key's location
// is local to the node (the global only elements are not useful beyond key generation).
err = txn.Systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf)
if err != nil {
return nil, err
}

schemaVersionHistoryKey := core.NewSchemaHistoryKey(schema.SchemaID, previousSchemaVersionID)
err = txn.Systemstore().Put(ctx, schemaVersionHistoryKey.ToDS(), []byte(schemaVersionID))
if err != nil {
return nil, err
}

if setAsDefaultVersion {
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, schema.SchemaID, schemaVersionID)
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, schema.SchemaID, schema.VersionID)
if err != nil {
return nil, err
}
Expand All @@ -308,6 +262,10 @@ func (db *db) validateUpdateSchema(
proposedDescriptionsByName map[string]client.SchemaDescription,
proposedDesc client.SchemaDescription,
) (bool, error) {
if proposedDesc.Name == "" {
return false, ErrSchemaNameEmpty
}

existingDesc, collectionExists := existingDescriptionsByName[proposedDesc.Name]
if !collectionExists {
return false, NewErrAddCollectionWithPatch(proposedDesc.Name)
Expand Down Expand Up @@ -538,7 +496,7 @@ func (db *db) setDefaultSchemaVersion(
}

desc := col.Description()
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, desc.Schema.SchemaID, schemaVersionID)
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, col.Schema().SchemaID, schemaVersionID)
if err != nil {
return err
}
Expand Down Expand Up @@ -597,11 +555,16 @@ func (db *db) getCollectionByVersionID(
return nil, err
}

schema, err := description.GetSchemaVersion(ctx, txn, desc.SchemaVersionID)
if err != nil {
return nil, err
}

col := &collection{
db: db,
def: client.CollectionDefinition{
Description: desc,
Schema: desc.Schema,
Schema: schema,
},
}

Expand Down
30 changes: 30 additions & 0 deletions db/description/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package description

import "github.com/sourcenetwork/defradb/errors"

const (
errFailedToCreateSchemaQuery string = "failed to create schema prefix query"
errFailedToCloseSchemaQuery string = "failed to close schema prefix query"
)

// NewErrFailedToCreateSchemaQuery returns a new error indicating that the query
// to create a schema failed.
func NewErrFailedToCreateSchemaQuery(inner error) error {
return errors.Wrap(errFailedToCreateSchemaQuery, inner)
}

// NewErrFailedToCreateSchemaQuery returns a new error indicating that the query
// to create a schema failed to close.
func NewErrFailedToCloseSchemaQuery(inner error) error {
return errors.Wrap(errFailedToCloseSchemaQuery, inner)
}
Loading

0 comments on commit e95510f

Please sign in to comment.