Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-8011 Add cache for GetSchemaMetadata #1042

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 69 additions & 69 deletions schemaregistry/mock_schemaregistry_client.go
Original file line number Diff line number Diff line change
@@ -53,16 +53,16 @@ type idCacheEntry struct {
/* HTTP(S) Schema Registry Client and schema caches */
type mockclient struct {
sync.Mutex
url *url.URL
schemaCache map[subjectJSON]idCacheEntry
schemaCacheLock sync.RWMutex
idCache map[subjectID]*SchemaInfo
idCacheLock sync.RWMutex
versionCache map[subjectJSON]versionCacheEntry
versionCacheLock sync.RWMutex
compatibilityCache map[string]Compatibility
compatibilityCacheLock sync.RWMutex
counter counter
url *url.URL
schemaToIdCache map[subjectJSON]idCacheEntry
schemaToIdCacheLock sync.RWMutex
idToSchemaCache map[subjectID]*SchemaInfo
idToSchemaCacheLock sync.RWMutex
schemaToVersionCache map[subjectJSON]versionCacheEntry
schemaToVersionCacheLock sync.RWMutex
compatibilityCache map[string]Compatibility
compatibilityCacheLock sync.RWMutex
counter counter
}

var _ Client = new(mockclient)
@@ -77,12 +77,12 @@ func (c *mockclient) Register(subject string, schema SchemaInfo, normalize bool)
subject: subject,
json: string(schemaJSON),
}
c.schemaCacheLock.RLock()
idCacheEntryVal, ok := c.schemaCache[cacheKey]
c.schemaToIdCacheLock.RLock()
idCacheEntryVal, ok := c.schemaToIdCache[cacheKey]
if idCacheEntryVal.softDeleted {
ok = false
}
c.schemaCacheLock.RUnlock()
c.schemaToIdCacheLock.RUnlock()
if ok {
return idCacheEntryVal.id, nil
}
@@ -91,22 +91,22 @@ func (c *mockclient) Register(subject string, schema SchemaInfo, normalize bool)
if err != nil {
return -1, err
}
c.schemaCacheLock.Lock()
c.schemaCache[cacheKey] = idCacheEntry{id, false}
c.schemaCacheLock.Unlock()
c.schemaToIdCacheLock.Lock()
c.schemaToIdCache[cacheKey] = idCacheEntry{id, false}
c.schemaToIdCacheLock.Unlock()
return id, nil
}

func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int, error) {
var id = -1
c.idCacheLock.RLock()
for key, value := range c.idCache {
c.idToSchemaCacheLock.RLock()
for key, value := range c.idToSchemaCache {
if key.subject == subject && schemasEqual(*value, schema) {
id = key.id
break
}
}
c.idCacheLock.RUnlock()
c.idToSchemaCacheLock.RUnlock()
err := c.generateVersion(subject, schema)
if err != nil {
return -1, err
@@ -117,9 +117,9 @@ func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int,
subject: subject,
id: id,
}
c.idCacheLock.Lock()
c.idCache[idCacheKey] = &schema
c.idCacheLock.Unlock()
c.idToSchemaCacheLock.Lock()
c.idToSchemaCache[idCacheKey] = &schema
c.idToSchemaCacheLock.Unlock()
}
return id, nil
}
@@ -140,9 +140,9 @@ func (c *mockclient) generateVersion(subject string, schema SchemaInfo) error {
subject: subject,
json: string(schemaJSON),
}
c.versionCacheLock.Lock()
c.versionCache[cacheKey] = versionCacheEntry{newVersion, false}
c.versionCacheLock.Unlock()
c.schemaToVersionCacheLock.Lock()
c.schemaToVersionCache[cacheKey] = versionCacheEntry{newVersion, false}
c.schemaToVersionCacheLock.Unlock()
return nil
}

@@ -153,9 +153,9 @@ func (c *mockclient) GetBySubjectAndID(subject string, id int) (schema SchemaInf
subject: subject,
id: id,
}
c.idCacheLock.RLock()
info, ok := c.idCache[cacheKey]
c.idCacheLock.RUnlock()
c.idToSchemaCacheLock.RLock()
info, ok := c.idToSchemaCache[cacheKey]
c.idToSchemaCacheLock.RUnlock()
if ok {
return *info, nil
}
@@ -177,12 +177,12 @@ func (c *mockclient) GetID(subject string, schema SchemaInfo, normalize bool) (i
subject: subject,
json: string(schemaJSON),
}
c.schemaCacheLock.RLock()
idCacheEntryVal, ok := c.schemaCache[cacheKey]
c.schemaToIdCacheLock.RLock()
idCacheEntryVal, ok := c.schemaToIdCache[cacheKey]
if idCacheEntryVal.softDeleted {
ok = false
}
c.schemaCacheLock.RUnlock()
c.schemaToIdCacheLock.RUnlock()
if ok {
return idCacheEntryVal.id, nil
}
@@ -214,14 +214,14 @@ func (c *mockclient) GetLatestSchemaMetadata(subject string) (result SchemaMetad
// Returns SchemaMetadata object
func (c *mockclient) GetSchemaMetadata(subject string, version int) (result SchemaMetadata, err error) {
var json string
c.versionCacheLock.RLock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.RLock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && value.version == version && !value.softDeleted {
json = key.json
break
}
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
if json == "" {
posErr := url.Error{
Op: "GET",
@@ -237,14 +237,14 @@ func (c *mockclient) GetSchemaMetadata(subject string, version int) (result Sche
return SchemaMetadata{}, err
}
var id = -1
c.idCacheLock.RLock()
for key, value := range c.idCache {
c.idToSchemaCacheLock.RLock()
for key, value := range c.idToSchemaCache {
if key.subject == subject && schemasEqual(*value, info) {
id = key.id
break
}
}
c.idCacheLock.RUnlock()
c.idToSchemaCacheLock.RUnlock()
if id == -1 {
posErr := url.Error{
Op: "GET",
@@ -279,13 +279,13 @@ func (c *mockclient) GetAllVersions(subject string) (results []int, err error) {

func (c *mockclient) allVersions(subject string) (results []int) {
versions := make([]int, 0)
c.versionCacheLock.RLock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.RLock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && !value.softDeleted {
versions = append(versions, value.version)
}
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
sort.Ints(versions)
return versions
}
@@ -300,17 +300,17 @@ func (c *mockclient) latestVersion(subject string) int {

func (c *mockclient) deleteVersion(key subjectJSON, version int, permanent bool) {
if permanent {
delete(c.versionCache, key)
delete(c.schemaToVersionCache, key)
} else {
c.versionCache[key] = versionCacheEntry{version, true}
c.schemaToVersionCache[key] = versionCacheEntry{version, true}
}
}

func (c *mockclient) deleteID(key subjectJSON, id int, permanent bool) {
if permanent {
delete(c.schemaCache, key)
delete(c.schemaToIdCache, key)
} else {
c.schemaCache[key] = idCacheEntry{id, true}
c.schemaToIdCache[key] = idCacheEntry{id, true}
}
}

@@ -325,12 +325,12 @@ func (c *mockclient) GetVersion(subject string, schema SchemaInfo, normalize boo
subject: subject,
json: string(schemaJSON),
}
c.versionCacheLock.RLock()
versionCacheEntryVal, ok := c.versionCache[cacheKey]
c.schemaToVersionCacheLock.RLock()
versionCacheEntryVal, ok := c.schemaToVersionCache[cacheKey]
if versionCacheEntryVal.softDeleted {
ok = false
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
if ok {
return versionCacheEntryVal.version, nil
}
@@ -346,80 +346,80 @@ func (c *mockclient) GetVersion(subject string, schema SchemaInfo, normalize boo
// Returns a string slice containing all registered subjects
func (c *mockclient) GetAllSubjects() ([]string, error) {
subjects := make([]string, 0)
c.versionCacheLock.RLock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.RLock()
for key, value := range c.schemaToVersionCache {
if !value.softDeleted {
subjects = append(subjects, key.subject)
}
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
sort.Strings(subjects)
return subjects, nil
}

// Deletes provided Subject from registry
// Returns integer slice of versions removed by delete
func (c *mockclient) DeleteSubject(subject string, permanent bool) (deleted []int, err error) {
c.schemaCacheLock.Lock()
for key, value := range c.schemaCache {
c.schemaToIdCacheLock.Lock()
for key, value := range c.schemaToIdCache {
if key.subject == subject && (!value.softDeleted || permanent) {
c.deleteID(key, value.id, permanent)
}
}
c.schemaCacheLock.Unlock()
c.versionCacheLock.Lock()
for key, value := range c.versionCache {
c.schemaToIdCacheLock.Unlock()
c.schemaToVersionCacheLock.Lock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && (!value.softDeleted || permanent) {
c.deleteVersion(key, value.version, permanent)
deleted = append(deleted, value.version)
}
}
c.versionCacheLock.Unlock()
c.schemaToVersionCacheLock.Unlock()
c.compatibilityCacheLock.Lock()
delete(c.compatibilityCache, subject)
c.compatibilityCacheLock.Unlock()
if permanent {
c.idCacheLock.Lock()
for key := range c.idCache {
c.idToSchemaCacheLock.Lock()
for key := range c.idToSchemaCache {
if key.subject == subject {
delete(c.idCache, key)
delete(c.idToSchemaCache, key)
}
}
c.idCacheLock.Unlock()
c.idToSchemaCacheLock.Unlock()
}
return deleted, nil
}

// DeleteSubjectVersion removes the version identified by delete from the subject's registration
// Returns integer id for the deleted version
func (c *mockclient) DeleteSubjectVersion(subject string, version int, permanent bool) (deleted int, err error) {
c.versionCacheLock.Lock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.Lock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && value.version == version {
c.deleteVersion(key, value.version, permanent)
schemaJSON := key.json
cacheKeySchema := subjectJSON{
subject: subject,
json: string(schemaJSON),
}
c.schemaCacheLock.Lock()
idSchemaEntryVal, ok := c.schemaCache[cacheKeySchema]
c.schemaToIdCacheLock.Lock()
idSchemaEntryVal, ok := c.schemaToIdCache[cacheKeySchema]
if ok {
c.deleteID(key, idSchemaEntryVal.id, permanent)
}
c.schemaCacheLock.Unlock()
c.schemaToIdCacheLock.Unlock()
if permanent && ok {
c.idCacheLock.Lock()
c.idToSchemaCacheLock.Lock()
cacheKeyID := subjectID{
subject: subject,
id: idSchemaEntryVal.id,
}
delete(c.idCache, cacheKeyID)
c.idCacheLock.Unlock()
delete(c.idToSchemaCache, cacheKeyID)
c.idToSchemaCacheLock.Unlock()
}
}
}
c.versionCacheLock.Unlock()
c.schemaToVersionCacheLock.Unlock()
return version, nil
}

Loading