Skip to content

Commit

Permalink
DGS-8011 Add cache for GetSchemaMetadata (#1042)
Browse files Browse the repository at this point in the history
* DGS-8011 Add cache for GetSchemaMetadata

This is a port of the corresponding functionality in the Java client
at confluentinc/schema-registry#2658

* Minor fix
  • Loading branch information
rayokota authored Aug 21, 2023
1 parent 22808ab commit abc3e18
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 146 deletions.
138 changes: 69 additions & 69 deletions schemaregistry/mock_schemaregistry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ type idCacheEntry struct {
/* HTTP(S) Schema Registry Client and schema caches */
type mockclient struct {
sync.Mutex
url *url.URL
schemaCache map[subjectJSON]idCacheEntry
schemaCacheLock sync.RWMutex
idCache map[subjectID]*SchemaInfo
idCacheLock sync.RWMutex
versionCache map[subjectJSON]versionCacheEntry
versionCacheLock sync.RWMutex
compatibilityCache map[string]Compatibility
compatibilityCacheLock sync.RWMutex
counter counter
url *url.URL
schemaToIdCache map[subjectJSON]idCacheEntry
schemaToIdCacheLock sync.RWMutex
idToSchemaCache map[subjectID]*SchemaInfo
idToSchemaCacheLock sync.RWMutex
schemaToVersionCache map[subjectJSON]versionCacheEntry
schemaToVersionCacheLock sync.RWMutex
compatibilityCache map[string]Compatibility
compatibilityCacheLock sync.RWMutex
counter counter
}

var _ Client = new(mockclient)
Expand All @@ -77,12 +77,12 @@ func (c *mockclient) Register(subject string, schema SchemaInfo, normalize bool)
subject: subject,
json: string(schemaJSON),
}
c.schemaCacheLock.RLock()
idCacheEntryVal, ok := c.schemaCache[cacheKey]
c.schemaToIdCacheLock.RLock()
idCacheEntryVal, ok := c.schemaToIdCache[cacheKey]
if idCacheEntryVal.softDeleted {
ok = false
}
c.schemaCacheLock.RUnlock()
c.schemaToIdCacheLock.RUnlock()
if ok {
return idCacheEntryVal.id, nil
}
Expand All @@ -91,22 +91,22 @@ func (c *mockclient) Register(subject string, schema SchemaInfo, normalize bool)
if err != nil {
return -1, err
}
c.schemaCacheLock.Lock()
c.schemaCache[cacheKey] = idCacheEntry{id, false}
c.schemaCacheLock.Unlock()
c.schemaToIdCacheLock.Lock()
c.schemaToIdCache[cacheKey] = idCacheEntry{id, false}
c.schemaToIdCacheLock.Unlock()
return id, nil
}

func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int, error) {
var id = -1
c.idCacheLock.RLock()
for key, value := range c.idCache {
c.idToSchemaCacheLock.RLock()
for key, value := range c.idToSchemaCache {
if key.subject == subject && schemasEqual(*value, schema) {
id = key.id
break
}
}
c.idCacheLock.RUnlock()
c.idToSchemaCacheLock.RUnlock()
err := c.generateVersion(subject, schema)
if err != nil {
return -1, err
Expand All @@ -117,9 +117,9 @@ func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int,
subject: subject,
id: id,
}
c.idCacheLock.Lock()
c.idCache[idCacheKey] = &schema
c.idCacheLock.Unlock()
c.idToSchemaCacheLock.Lock()
c.idToSchemaCache[idCacheKey] = &schema
c.idToSchemaCacheLock.Unlock()
}
return id, nil
}
Expand All @@ -140,9 +140,9 @@ func (c *mockclient) generateVersion(subject string, schema SchemaInfo) error {
subject: subject,
json: string(schemaJSON),
}
c.versionCacheLock.Lock()
c.versionCache[cacheKey] = versionCacheEntry{newVersion, false}
c.versionCacheLock.Unlock()
c.schemaToVersionCacheLock.Lock()
c.schemaToVersionCache[cacheKey] = versionCacheEntry{newVersion, false}
c.schemaToVersionCacheLock.Unlock()
return nil
}

Expand All @@ -153,9 +153,9 @@ func (c *mockclient) GetBySubjectAndID(subject string, id int) (schema SchemaInf
subject: subject,
id: id,
}
c.idCacheLock.RLock()
info, ok := c.idCache[cacheKey]
c.idCacheLock.RUnlock()
c.idToSchemaCacheLock.RLock()
info, ok := c.idToSchemaCache[cacheKey]
c.idToSchemaCacheLock.RUnlock()
if ok {
return *info, nil
}
Expand All @@ -177,12 +177,12 @@ func (c *mockclient) GetID(subject string, schema SchemaInfo, normalize bool) (i
subject: subject,
json: string(schemaJSON),
}
c.schemaCacheLock.RLock()
idCacheEntryVal, ok := c.schemaCache[cacheKey]
c.schemaToIdCacheLock.RLock()
idCacheEntryVal, ok := c.schemaToIdCache[cacheKey]
if idCacheEntryVal.softDeleted {
ok = false
}
c.schemaCacheLock.RUnlock()
c.schemaToIdCacheLock.RUnlock()
if ok {
return idCacheEntryVal.id, nil
}
Expand Down Expand Up @@ -214,14 +214,14 @@ func (c *mockclient) GetLatestSchemaMetadata(subject string) (result SchemaMetad
// Returns SchemaMetadata object
func (c *mockclient) GetSchemaMetadata(subject string, version int) (result SchemaMetadata, err error) {
var json string
c.versionCacheLock.RLock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.RLock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && value.version == version && !value.softDeleted {
json = key.json
break
}
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
if json == "" {
posErr := url.Error{
Op: "GET",
Expand All @@ -237,14 +237,14 @@ func (c *mockclient) GetSchemaMetadata(subject string, version int) (result Sche
return SchemaMetadata{}, err
}
var id = -1
c.idCacheLock.RLock()
for key, value := range c.idCache {
c.idToSchemaCacheLock.RLock()
for key, value := range c.idToSchemaCache {
if key.subject == subject && schemasEqual(*value, info) {
id = key.id
break
}
}
c.idCacheLock.RUnlock()
c.idToSchemaCacheLock.RUnlock()
if id == -1 {
posErr := url.Error{
Op: "GET",
Expand Down Expand Up @@ -279,13 +279,13 @@ func (c *mockclient) GetAllVersions(subject string) (results []int, err error) {

func (c *mockclient) allVersions(subject string) (results []int) {
versions := make([]int, 0)
c.versionCacheLock.RLock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.RLock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && !value.softDeleted {
versions = append(versions, value.version)
}
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
sort.Ints(versions)
return versions
}
Expand All @@ -300,17 +300,17 @@ func (c *mockclient) latestVersion(subject string) int {

func (c *mockclient) deleteVersion(key subjectJSON, version int, permanent bool) {
if permanent {
delete(c.versionCache, key)
delete(c.schemaToVersionCache, key)
} else {
c.versionCache[key] = versionCacheEntry{version, true}
c.schemaToVersionCache[key] = versionCacheEntry{version, true}
}
}

func (c *mockclient) deleteID(key subjectJSON, id int, permanent bool) {
if permanent {
delete(c.schemaCache, key)
delete(c.schemaToIdCache, key)
} else {
c.schemaCache[key] = idCacheEntry{id, true}
c.schemaToIdCache[key] = idCacheEntry{id, true}
}
}

Expand All @@ -325,12 +325,12 @@ func (c *mockclient) GetVersion(subject string, schema SchemaInfo, normalize boo
subject: subject,
json: string(schemaJSON),
}
c.versionCacheLock.RLock()
versionCacheEntryVal, ok := c.versionCache[cacheKey]
c.schemaToVersionCacheLock.RLock()
versionCacheEntryVal, ok := c.schemaToVersionCache[cacheKey]
if versionCacheEntryVal.softDeleted {
ok = false
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
if ok {
return versionCacheEntryVal.version, nil
}
Expand All @@ -346,80 +346,80 @@ func (c *mockclient) GetVersion(subject string, schema SchemaInfo, normalize boo
// Returns a string slice containing all registered subjects
func (c *mockclient) GetAllSubjects() ([]string, error) {
subjects := make([]string, 0)
c.versionCacheLock.RLock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.RLock()
for key, value := range c.schemaToVersionCache {
if !value.softDeleted {
subjects = append(subjects, key.subject)
}
}
c.versionCacheLock.RUnlock()
c.schemaToVersionCacheLock.RUnlock()
sort.Strings(subjects)
return subjects, nil
}

// Deletes provided Subject from registry
// Returns integer slice of versions removed by delete
func (c *mockclient) DeleteSubject(subject string, permanent bool) (deleted []int, err error) {
c.schemaCacheLock.Lock()
for key, value := range c.schemaCache {
c.schemaToIdCacheLock.Lock()
for key, value := range c.schemaToIdCache {
if key.subject == subject && (!value.softDeleted || permanent) {
c.deleteID(key, value.id, permanent)
}
}
c.schemaCacheLock.Unlock()
c.versionCacheLock.Lock()
for key, value := range c.versionCache {
c.schemaToIdCacheLock.Unlock()
c.schemaToVersionCacheLock.Lock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && (!value.softDeleted || permanent) {
c.deleteVersion(key, value.version, permanent)
deleted = append(deleted, value.version)
}
}
c.versionCacheLock.Unlock()
c.schemaToVersionCacheLock.Unlock()
c.compatibilityCacheLock.Lock()
delete(c.compatibilityCache, subject)
c.compatibilityCacheLock.Unlock()
if permanent {
c.idCacheLock.Lock()
for key := range c.idCache {
c.idToSchemaCacheLock.Lock()
for key := range c.idToSchemaCache {
if key.subject == subject {
delete(c.idCache, key)
delete(c.idToSchemaCache, key)
}
}
c.idCacheLock.Unlock()
c.idToSchemaCacheLock.Unlock()
}
return deleted, nil
}

// DeleteSubjectVersion removes the version identified by delete from the subject's registration
// Returns integer id for the deleted version
func (c *mockclient) DeleteSubjectVersion(subject string, version int, permanent bool) (deleted int, err error) {
c.versionCacheLock.Lock()
for key, value := range c.versionCache {
c.schemaToVersionCacheLock.Lock()
for key, value := range c.schemaToVersionCache {
if key.subject == subject && value.version == version {
c.deleteVersion(key, value.version, permanent)
schemaJSON := key.json
cacheKeySchema := subjectJSON{
subject: subject,
json: string(schemaJSON),
}
c.schemaCacheLock.Lock()
idSchemaEntryVal, ok := c.schemaCache[cacheKeySchema]
c.schemaToIdCacheLock.Lock()
idSchemaEntryVal, ok := c.schemaToIdCache[cacheKeySchema]
if ok {
c.deleteID(key, idSchemaEntryVal.id, permanent)
}
c.schemaCacheLock.Unlock()
c.schemaToIdCacheLock.Unlock()
if permanent && ok {
c.idCacheLock.Lock()
c.idToSchemaCacheLock.Lock()
cacheKeyID := subjectID{
subject: subject,
id: idSchemaEntryVal.id,
}
delete(c.idCache, cacheKeyID)
c.idCacheLock.Unlock()
delete(c.idToSchemaCache, cacheKeyID)
c.idToSchemaCacheLock.Unlock()
}
}
}
c.versionCacheLock.Unlock()
c.schemaToVersionCacheLock.Unlock()
return version, nil
}

Expand Down
Loading

1 comment on commit abc3e18

@daniel-yliu
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi do you know when this PR can be released.

Would it solve the slow serialization when having UseLatestVersion = true?

Please sign in to comment.