From 7bd88e0588df4606a3452dcc98fe7ee39a3f7e28 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 15 Aug 2023 15:26:46 -0700 Subject: [PATCH 1/2] DGS-8011 Add cache for GetSchemaMetadata This is a port of the corresponding functionality in the Java client at https://github.com/confluentinc/schema-registry/pull/2658 --- schemaregistry/mock_schemaregistry_client.go | 138 ++++++------ schemaregistry/schemaregistry_client.go | 208 ++++++++++++------- 2 files changed, 200 insertions(+), 146 deletions(-) diff --git a/schemaregistry/mock_schemaregistry_client.go b/schemaregistry/mock_schemaregistry_client.go index 04398f218..30db197e7 100644 --- a/schemaregistry/mock_schemaregistry_client.go +++ b/schemaregistry/mock_schemaregistry_client.go @@ -53,16 +53,16 @@ type idCacheEntry struct { /* HTTP(S) Schema Registry Client and schema caches */ type mockclient struct { sync.Mutex - url *url.URL - schemaCache map[subjectJSON]idCacheEntry - schemaCacheLock sync.RWMutex - idCache map[subjectID]*SchemaInfo - idCacheLock sync.RWMutex - versionCache map[subjectJSON]versionCacheEntry - versionCacheLock sync.RWMutex - compatibilityCache map[string]Compatibility - compatibilityCacheLock sync.RWMutex - counter counter + url *url.URL + schemaToIdCache map[subjectJSON]idCacheEntry + schemaToIdCacheLock sync.RWMutex + idToSchemaCache map[subjectID]*SchemaInfo + idToSchemaCacheLock sync.RWMutex + schemaToVersionCache map[subjectJSON]versionCacheEntry + schemaToVersionCacheLock sync.RWMutex + compatibilityCache map[string]Compatibility + compatibilityCacheLock sync.RWMutex + counter counter } var _ Client = new(mockclient) @@ -77,12 +77,12 @@ func (c *mockclient) Register(subject string, schema SchemaInfo, normalize bool) subject: subject, json: string(schemaJSON), } - c.schemaCacheLock.RLock() - idCacheEntryVal, ok := c.schemaCache[cacheKey] + c.schemaToIdCacheLock.RLock() + idCacheEntryVal, ok := c.schemaToIdCache[cacheKey] if idCacheEntryVal.softDeleted { ok = false } - c.schemaCacheLock.RUnlock() + c.schemaToIdCacheLock.RUnlock() if ok { return idCacheEntryVal.id, nil } @@ -91,22 +91,22 @@ func (c *mockclient) Register(subject string, schema SchemaInfo, normalize bool) if err != nil { return -1, err } - c.schemaCacheLock.Lock() - c.schemaCache[cacheKey] = idCacheEntry{id, false} - c.schemaCacheLock.Unlock() + c.schemaToIdCacheLock.Lock() + c.schemaToIdCache[cacheKey] = idCacheEntry{id, false} + c.schemaToIdCacheLock.Unlock() return id, nil } func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int, error) { var id = -1 - c.idCacheLock.RLock() - for key, value := range c.idCache { + c.idToSchemaCacheLock.RLock() + for key, value := range c.idToSchemaCache { if key.subject == subject && schemasEqual(*value, schema) { id = key.id break } } - c.idCacheLock.RUnlock() + c.idToSchemaCacheLock.RUnlock() err := c.generateVersion(subject, schema) if err != nil { return -1, err @@ -117,9 +117,9 @@ func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int, subject: subject, id: id, } - c.idCacheLock.Lock() - c.idCache[idCacheKey] = &schema - c.idCacheLock.Unlock() + c.idToSchemaCacheLock.Lock() + c.idToSchemaCache[idCacheKey] = &schema + c.idToSchemaCacheLock.Unlock() } return id, nil } @@ -140,9 +140,9 @@ func (c *mockclient) generateVersion(subject string, schema SchemaInfo) error { subject: subject, json: string(schemaJSON), } - c.versionCacheLock.Lock() - c.versionCache[cacheKey] = versionCacheEntry{newVersion, false} - c.versionCacheLock.Unlock() + c.schemaToVersionCacheLock.Lock() + c.schemaToVersionCache[cacheKey] = versionCacheEntry{newVersion, false} + c.schemaToVersionCacheLock.Unlock() return nil } @@ -153,9 +153,9 @@ func (c *mockclient) GetBySubjectAndID(subject string, id int) (schema SchemaInf subject: subject, id: id, } - c.idCacheLock.RLock() - info, ok := c.idCache[cacheKey] - c.idCacheLock.RUnlock() + c.idToSchemaCacheLock.RLock() + info, ok := c.idToSchemaCache[cacheKey] + c.idToSchemaCacheLock.RUnlock() if ok { return *info, nil } @@ -177,12 +177,12 @@ func (c *mockclient) GetID(subject string, schema SchemaInfo, normalize bool) (i subject: subject, json: string(schemaJSON), } - c.schemaCacheLock.RLock() - idCacheEntryVal, ok := c.schemaCache[cacheKey] + c.schemaToIdCacheLock.RLock() + idCacheEntryVal, ok := c.schemaToIdCache[cacheKey] if idCacheEntryVal.softDeleted { ok = false } - c.schemaCacheLock.RUnlock() + c.schemaToIdCacheLock.RUnlock() if ok { return idCacheEntryVal.id, nil } @@ -214,14 +214,14 @@ func (c *mockclient) GetLatestSchemaMetadata(subject string) (result SchemaMetad // Returns SchemaMetadata object func (c *mockclient) GetSchemaMetadata(subject string, version int) (result SchemaMetadata, err error) { var json string - c.versionCacheLock.RLock() - for key, value := range c.versionCache { + c.schemaToVersionCacheLock.RLock() + for key, value := range c.schemaToVersionCache { if key.subject == subject && value.version == version && !value.softDeleted { json = key.json break } } - c.versionCacheLock.RUnlock() + c.schemaToVersionCacheLock.RUnlock() if json == "" { posErr := url.Error{ Op: "GET", @@ -237,14 +237,14 @@ func (c *mockclient) GetSchemaMetadata(subject string, version int) (result Sche return SchemaMetadata{}, err } var id = -1 - c.idCacheLock.RLock() - for key, value := range c.idCache { + c.idToSchemaCacheLock.RLock() + for key, value := range c.idToSchemaCache { if key.subject == subject && schemasEqual(*value, info) { id = key.id break } } - c.idCacheLock.RUnlock() + c.idToSchemaCacheLock.RUnlock() if id == -1 { posErr := url.Error{ Op: "GET", @@ -279,13 +279,13 @@ func (c *mockclient) GetAllVersions(subject string) (results []int, err error) { func (c *mockclient) allVersions(subject string) (results []int) { versions := make([]int, 0) - c.versionCacheLock.RLock() - for key, value := range c.versionCache { + c.schemaToVersionCacheLock.RLock() + for key, value := range c.schemaToVersionCache { if key.subject == subject && !value.softDeleted { versions = append(versions, value.version) } } - c.versionCacheLock.RUnlock() + c.schemaToVersionCacheLock.RUnlock() sort.Ints(versions) return versions } @@ -300,17 +300,17 @@ func (c *mockclient) latestVersion(subject string) int { func (c *mockclient) deleteVersion(key subjectJSON, version int, permanent bool) { if permanent { - delete(c.versionCache, key) + delete(c.schemaToVersionCache, key) } else { - c.versionCache[key] = versionCacheEntry{version, true} + c.schemaToVersionCache[key] = versionCacheEntry{version, true} } } func (c *mockclient) deleteID(key subjectJSON, id int, permanent bool) { if permanent { - delete(c.schemaCache, key) + delete(c.schemaToIdCache, key) } else { - c.schemaCache[key] = idCacheEntry{id, true} + c.schemaToIdCache[key] = idCacheEntry{id, true} } } @@ -325,12 +325,12 @@ func (c *mockclient) GetVersion(subject string, schema SchemaInfo, normalize boo subject: subject, json: string(schemaJSON), } - c.versionCacheLock.RLock() - versionCacheEntryVal, ok := c.versionCache[cacheKey] + c.schemaToVersionCacheLock.RLock() + versionCacheEntryVal, ok := c.schemaToVersionCache[cacheKey] if versionCacheEntryVal.softDeleted { ok = false } - c.versionCacheLock.RUnlock() + c.schemaToVersionCacheLock.RUnlock() if ok { return versionCacheEntryVal.version, nil } @@ -346,13 +346,13 @@ func (c *mockclient) GetVersion(subject string, schema SchemaInfo, normalize boo // Returns a string slice containing all registered subjects func (c *mockclient) GetAllSubjects() ([]string, error) { subjects := make([]string, 0) - c.versionCacheLock.RLock() - for key, value := range c.versionCache { + c.schemaToVersionCacheLock.RLock() + for key, value := range c.schemaToVersionCache { if !value.softDeleted { subjects = append(subjects, key.subject) } } - c.versionCacheLock.RUnlock() + c.schemaToVersionCacheLock.RUnlock() sort.Strings(subjects) return subjects, nil } @@ -360,32 +360,32 @@ func (c *mockclient) GetAllSubjects() ([]string, error) { // Deletes provided Subject from registry // Returns integer slice of versions removed by delete func (c *mockclient) DeleteSubject(subject string, permanent bool) (deleted []int, err error) { - c.schemaCacheLock.Lock() - for key, value := range c.schemaCache { + c.schemaToIdCacheLock.Lock() + for key, value := range c.schemaToIdCache { if key.subject == subject && (!value.softDeleted || permanent) { c.deleteID(key, value.id, permanent) } } - c.schemaCacheLock.Unlock() - c.versionCacheLock.Lock() - for key, value := range c.versionCache { + c.schemaToIdCacheLock.Unlock() + c.schemaToVersionCacheLock.Lock() + for key, value := range c.schemaToVersionCache { if key.subject == subject && (!value.softDeleted || permanent) { c.deleteVersion(key, value.version, permanent) deleted = append(deleted, value.version) } } - c.versionCacheLock.Unlock() + c.schemaToVersionCacheLock.Unlock() c.compatibilityCacheLock.Lock() delete(c.compatibilityCache, subject) c.compatibilityCacheLock.Unlock() if permanent { - c.idCacheLock.Lock() - for key := range c.idCache { + c.idToSchemaCacheLock.Lock() + for key := range c.idToSchemaCache { if key.subject == subject { - delete(c.idCache, key) + delete(c.idToSchemaCache, key) } } - c.idCacheLock.Unlock() + c.idToSchemaCacheLock.Unlock() } return deleted, nil } @@ -393,8 +393,8 @@ func (c *mockclient) DeleteSubject(subject string, permanent bool) (deleted []in // DeleteSubjectVersion removes the version identified by delete from the subject's registration // Returns integer id for the deleted version func (c *mockclient) DeleteSubjectVersion(subject string, version int, permanent bool) (deleted int, err error) { - c.versionCacheLock.Lock() - for key, value := range c.versionCache { + c.schemaToVersionCacheLock.Lock() + for key, value := range c.schemaToVersionCache { if key.subject == subject && value.version == version { c.deleteVersion(key, value.version, permanent) schemaJSON := key.json @@ -402,24 +402,24 @@ func (c *mockclient) DeleteSubjectVersion(subject string, version int, permanent subject: subject, json: string(schemaJSON), } - c.schemaCacheLock.Lock() - idSchemaEntryVal, ok := c.schemaCache[cacheKeySchema] + c.schemaToIdCacheLock.Lock() + idSchemaEntryVal, ok := c.schemaToIdCache[cacheKeySchema] if ok { c.deleteID(key, idSchemaEntryVal.id, permanent) } - c.schemaCacheLock.Unlock() + c.schemaToIdCacheLock.Unlock() if permanent && ok { - c.idCacheLock.Lock() + c.idToSchemaCacheLock.Lock() cacheKeyID := subjectID{ subject: subject, id: idSchemaEntryVal.id, } - delete(c.idCache, cacheKeyID) - c.idCacheLock.Unlock() + delete(c.idToSchemaCache, cacheKeyID) + c.idToSchemaCacheLock.Unlock() } } } - c.versionCacheLock.Unlock() + c.schemaToVersionCacheLock.Unlock() return version, nil } diff --git a/schemaregistry/schemaregistry_client.go b/schemaregistry/schemaregistry_client.go index 06c9749ce..11be5947b 100644 --- a/schemaregistry/schemaregistry_client.go +++ b/schemaregistry/schemaregistry_client.go @@ -172,16 +172,23 @@ type subjectID struct { id int } +type subjectVersion struct { + subject string + version int +} + /* HTTP(S) Schema Registry Client and schema caches */ type client struct { sync.Mutex - restService *restService - schemaCache cache.Cache - schemaCacheLock sync.RWMutex - idCache cache.Cache - idCacheLock sync.RWMutex - versionCache cache.Cache - versionCacheLock sync.RWMutex + restService *restService + schemaToIdCache cache.Cache + schemaToIdCacheLock sync.RWMutex + idToSchemaCache cache.Cache + idToSchemaCacheLock sync.RWMutex + schemaToVersionCache cache.Cache + schemaToVersionCacheLock sync.RWMutex + versionToSchemaCache cache.Cache + versionToSchemaCacheLock sync.RWMutex } var _ Client = new(client) @@ -217,11 +224,11 @@ func NewClient(conf *Config) (Client, error) { return nil, err } mock := &mockclient{ - url: url, - schemaCache: make(map[subjectJSON]idCacheEntry), - idCache: make(map[subjectID]*SchemaInfo), - versionCache: make(map[subjectJSON]versionCacheEntry), - compatibilityCache: make(map[string]Compatibility), + url: url, + schemaToIdCache: make(map[subjectJSON]idCacheEntry), + idToSchemaCache: make(map[subjectID]*SchemaInfo), + schemaToVersionCache: make(map[subjectJSON]versionCacheEntry), + compatibilityCache: make(map[string]Compatibility), } return mock, nil } @@ -231,32 +238,39 @@ func NewClient(conf *Config) (Client, error) { return nil, err } - var schemaCache cache.Cache - var idCache cache.Cache - var versionCache cache.Cache + var schemaToIdCache cache.Cache + var idToSchemaCache cache.Cache + var schemaToVersionCache cache.Cache + var versionToSchemaCache cache.Cache if conf.CacheCapacity != 0 { - schemaCache, err = cache.NewLRUCache(conf.CacheCapacity) + schemaToIdCache, err = cache.NewLRUCache(conf.CacheCapacity) + if err != nil { + return nil, err + } + idToSchemaCache, err = cache.NewLRUCache(conf.CacheCapacity) if err != nil { return nil, err } - idCache, err = cache.NewLRUCache(conf.CacheCapacity) + schemaToVersionCache, err = cache.NewLRUCache(conf.CacheCapacity) if err != nil { return nil, err } - versionCache, err = cache.NewLRUCache(conf.CacheCapacity) + versionToSchemaCache, err = cache.NewLRUCache(conf.CacheCapacity) if err != nil { return nil, err } } else { - schemaCache = cache.NewMapCache() - idCache = cache.NewMapCache() - versionCache = cache.NewMapCache() + schemaToIdCache = cache.NewMapCache() + idToSchemaCache = cache.NewMapCache() + schemaToVersionCache = cache.NewMapCache() + versionToSchemaCache = cache.NewMapCache() } handle := &client{ - restService: restService, - schemaCache: schemaCache, - idCache: idCache, - versionCache: versionCache, + restService: restService, + schemaToIdCache: schemaToIdCache, + idToSchemaCache: idToSchemaCache, + schemaToVersionCache: schemaToVersionCache, + versionToSchemaCache: versionToSchemaCache, } return handle, nil } @@ -271,9 +285,9 @@ func (c *client) Register(subject string, schema SchemaInfo, normalize bool) (id subject: subject, json: string(schemaJSON), } - c.schemaCacheLock.RLock() - idValue, ok := c.schemaCache.Get(cacheKey) - c.schemaCacheLock.RUnlock() + c.schemaToIdCacheLock.RLock() + idValue, ok := c.schemaToIdCache.Get(cacheKey) + c.schemaToIdCacheLock.RUnlock() if ok { return idValue.(int), nil } @@ -281,20 +295,20 @@ func (c *client) Register(subject string, schema SchemaInfo, normalize bool) (id metadata := SchemaMetadata{ SchemaInfo: schema, } - c.schemaCacheLock.Lock() + c.schemaToIdCacheLock.Lock() // another goroutine could have already put it in cache - idValue, ok = c.schemaCache.Get(cacheKey) + idValue, ok = c.schemaToIdCache.Get(cacheKey) if !ok { err = c.restService.handleRequest(newRequest("POST", versionNormalize, &metadata, url.PathEscape(subject), normalize), &metadata) if err == nil { - c.schemaCache.Put(cacheKey, metadata.ID) + c.schemaToIdCache.Put(cacheKey, metadata.ID) } else { metadata.ID = -1 } } else { metadata.ID = idValue.(int) } - c.schemaCacheLock.Unlock() + c.schemaToIdCacheLock.Unlock() return metadata.ID, err } @@ -305,18 +319,18 @@ func (c *client) GetBySubjectAndID(subject string, id int) (schema SchemaInfo, e subject: subject, id: id, } - c.idCacheLock.RLock() - infoValue, ok := c.idCache.Get(cacheKey) - c.idCacheLock.RUnlock() + c.idToSchemaCacheLock.RLock() + infoValue, ok := c.idToSchemaCache.Get(cacheKey) + c.idToSchemaCacheLock.RUnlock() if ok { return *infoValue.(*SchemaInfo), nil } metadata := SchemaMetadata{} newInfo := &SchemaInfo{} - c.idCacheLock.Lock() + c.idToSchemaCacheLock.Lock() // another goroutine could have already put it in cache - infoValue, ok = c.idCache.Get(cacheKey) + infoValue, ok = c.idToSchemaCache.Get(cacheKey) if !ok { if len(subject) > 0 { err = c.restService.handleRequest(newRequest("GET", schemasBySubject, nil, id, url.QueryEscape(subject)), &metadata) @@ -329,12 +343,12 @@ func (c *client) GetBySubjectAndID(subject string, id int) (schema SchemaInfo, e SchemaType: metadata.SchemaType, References: metadata.References, } - c.idCache.Put(cacheKey, newInfo) + c.idToSchemaCache.Put(cacheKey, newInfo) } } else { newInfo = infoValue.(*SchemaInfo) } - c.idCacheLock.Unlock() + c.idToSchemaCacheLock.Unlock() return *newInfo, err } @@ -348,9 +362,9 @@ func (c *client) GetID(subject string, schema SchemaInfo, normalize bool) (id in subject: subject, json: string(schemaJSON), } - c.schemaCacheLock.RLock() - idValue, ok := c.schemaCache.Get(cacheKey) - c.schemaCacheLock.RUnlock() + c.schemaToIdCacheLock.RLock() + idValue, ok := c.schemaToIdCache.Get(cacheKey) + c.schemaToIdCacheLock.RUnlock() if ok { return idValue.(int), nil } @@ -358,20 +372,20 @@ func (c *client) GetID(subject string, schema SchemaInfo, normalize bool) (id in metadata := SchemaMetadata{ SchemaInfo: schema, } - c.schemaCacheLock.Lock() + c.schemaToIdCacheLock.Lock() // another goroutine could have already put it in cache - idValue, ok = c.schemaCache.Get(cacheKey) + idValue, ok = c.schemaToIdCache.Get(cacheKey) if !ok { err = c.restService.handleRequest(newRequest("POST", subjectsNormalize, &metadata, url.PathEscape(subject), normalize), &metadata) if err == nil { - c.schemaCache.Put(cacheKey, metadata.ID) + c.schemaToIdCache.Put(cacheKey, metadata.ID) } else { metadata.ID = -1 } } else { metadata.ID = idValue.(int) } - c.schemaCacheLock.Unlock() + c.schemaToIdCacheLock.Unlock() return metadata.ID, err } @@ -386,8 +400,29 @@ func (c *client) GetLatestSchemaMetadata(subject string) (result SchemaMetadata, // GetSchemaMetadata fetches the requested subject schema identified by version // Returns SchemaMetadata object func (c *client) GetSchemaMetadata(subject string, version int) (result SchemaMetadata, err error) { - err = c.restService.handleRequest(newRequest("GET", versions, nil, url.PathEscape(subject), version), &result) + cacheKey := subjectVersion{ + subject: subject, + version: version, + } + c.versionToSchemaCacheLock.RLock() + metadataValue, ok := c.versionToSchemaCache.Get(cacheKey) + c.versionToSchemaCacheLock.RUnlock() + if ok { + return *metadataValue.(*SchemaMetadata), nil + } + c.versionToSchemaCacheLock.Lock() + // another goroutine could have already put it in cache + metadataValue, ok = c.versionToSchemaCache.Get(cacheKey) + if !ok { + err = c.restService.handleRequest(newRequest("GET", versions, nil, url.PathEscape(subject), version), &result) + if err == nil { + c.versionToSchemaCache.Put(cacheKey, &result) + } + } else { + result = *metadataValue.(*SchemaMetadata) + } + c.versionToSchemaCacheLock.Unlock() return result, err } @@ -411,9 +446,9 @@ func (c *client) GetVersion(subject string, schema SchemaInfo, normalize bool) ( subject: subject, json: string(schemaJSON), } - c.versionCacheLock.RLock() - versionValue, ok := c.versionCache.Get(cacheKey) - c.versionCacheLock.RUnlock() + c.schemaToVersionCacheLock.RLock() + versionValue, ok := c.schemaToVersionCache.Get(cacheKey) + c.schemaToVersionCacheLock.RUnlock() if ok { return versionValue.(int), nil } @@ -421,20 +456,20 @@ func (c *client) GetVersion(subject string, schema SchemaInfo, normalize bool) ( metadata := SchemaMetadata{ SchemaInfo: schema, } - c.versionCacheLock.Lock() + c.schemaToVersionCacheLock.Lock() // another goroutine could have already put it in cache - versionValue, ok = c.versionCache.Get(cacheKey) + versionValue, ok = c.schemaToVersionCache.Get(cacheKey) if !ok { err = c.restService.handleRequest(newRequest("POST", subjectsNormalize, &metadata, url.PathEscape(subject), normalize), &metadata) if err == nil { - c.versionCache.Put(cacheKey, metadata.Version) + c.schemaToVersionCache.Put(cacheKey, metadata.Version) } else { metadata.Version = -1 } } else { metadata.Version = versionValue.(int) } - c.versionCacheLock.Unlock() + c.schemaToVersionCacheLock.Unlock() return metadata.Version, err } @@ -450,30 +485,40 @@ func (c *client) GetAllSubjects() ([]string, error) { // Deletes provided Subject from registry // Returns integer slice of versions removed by delete func (c *client) DeleteSubject(subject string, permanent bool) (deleted []int, err error) { - c.schemaCacheLock.Lock() - for keyValue := range c.schemaCache.ToMap() { + c.schemaToIdCacheLock.Lock() + for keyValue := range c.schemaToIdCache.ToMap() { key := keyValue.(subjectJSON) if key.subject == subject { - c.schemaCache.Delete(key) + c.schemaToIdCache.Delete(key) } } - c.schemaCacheLock.Unlock() - c.versionCacheLock.Lock() - for keyValue := range c.versionCache.ToMap() { + c.schemaToIdCacheLock.Unlock() + c.schemaToVersionCacheLock.Lock() + for keyValue := range c.schemaToVersionCache.ToMap() { key := keyValue.(subjectJSON) if key.subject == subject { - c.versionCache.Delete(key) + c.schemaToVersionCache.Delete(key) } } - c.versionCacheLock.Unlock() - c.idCacheLock.Lock() - for keyValue := range c.idCache.ToMap() { + c.schemaToVersionCacheLock.Unlock() + if permanent { + c.versionToSchemaCacheLock.Lock() + for keyValue := range c.versionToSchemaCache.ToMap() { + key := keyValue.(subjectVersion) + if key.subject == subject { + c.versionToSchemaCache.Delete(key) + } + } + c.versionToSchemaCacheLock.Unlock() + } + c.idToSchemaCacheLock.Lock() + for keyValue := range c.idToSchemaCache.ToMap() { key := keyValue.(subjectID) if key.subject == subject { - c.idCache.Delete(key) + c.idToSchemaCache.Delete(key) } } - c.idCacheLock.Unlock() + c.idToSchemaCacheLock.Unlock() var result []int err = c.restService.handleRequest(newRequest("DELETE", subjectsDelete, nil, url.PathEscape(subject), permanent), &result) return result, err @@ -482,35 +527,44 @@ func (c *client) DeleteSubject(subject string, permanent bool) (deleted []int, e // DeleteSubjectVersion removes the version identified by delete from the subject's registration // Returns integer id for the deleted version func (c *client) DeleteSubjectVersion(subject string, version int, permanent bool) (deleted int, err error) { - c.versionCacheLock.Lock() - for keyValue, value := range c.versionCache.ToMap() { + c.schemaToVersionCacheLock.Lock() + for keyValue, value := range c.schemaToVersionCache.ToMap() { key := keyValue.(subjectJSON) if key.subject == subject && value == version { - c.versionCache.Delete(key) + c.schemaToVersionCache.Delete(key) schemaJSON := key.json cacheKeySchema := subjectJSON{ subject: subject, json: string(schemaJSON), } - c.schemaCacheLock.Lock() - idValue, ok := c.schemaCache.Get(cacheKeySchema) + c.schemaToIdCacheLock.Lock() + idValue, ok := c.schemaToIdCache.Get(cacheKeySchema) if ok { - c.schemaCache.Delete(cacheKeySchema) + c.schemaToIdCache.Delete(cacheKeySchema) } - c.schemaCacheLock.Unlock() + c.schemaToIdCacheLock.Unlock() if ok { id := idValue.(int) - c.idCacheLock.Lock() + c.idToSchemaCacheLock.Lock() cacheKeyID := subjectID{ subject: subject, id: id, } - c.idCache.Delete(cacheKeyID) - c.idCacheLock.Unlock() + c.idToSchemaCache.Delete(cacheKeyID) + c.idToSchemaCacheLock.Unlock() } } } - c.versionCacheLock.Unlock() + c.schemaToVersionCacheLock.Unlock() + if permanent { + c.versionToSchemaCacheLock.Lock() + cacheKey := subjectVersion{ + subject: subject, + version: version, + } + c.versionToSchemaCache.Delete(cacheKey) + c.versionToSchemaCacheLock.Unlock() + } var result int err = c.restService.handleRequest(newRequest("DELETE", versionsDelete, nil, url.PathEscape(subject), version, permanent), &result) return result, err From effa125f666a01b7222d1c4faad2a9d9edcc6625 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 16 Aug 2023 09:17:29 -0700 Subject: [PATCH 2/2] Minor fix --- schemaregistry/schemaregistry_client.go | 28 +++++++++++-------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/schemaregistry/schemaregistry_client.go b/schemaregistry/schemaregistry_client.go index 11be5947b..26df25c01 100644 --- a/schemaregistry/schemaregistry_client.go +++ b/schemaregistry/schemaregistry_client.go @@ -501,16 +501,14 @@ func (c *client) DeleteSubject(subject string, permanent bool) (deleted []int, e } } c.schemaToVersionCacheLock.Unlock() - if permanent { - c.versionToSchemaCacheLock.Lock() - for keyValue := range c.versionToSchemaCache.ToMap() { - key := keyValue.(subjectVersion) - if key.subject == subject { - c.versionToSchemaCache.Delete(key) - } + c.versionToSchemaCacheLock.Lock() + for keyValue := range c.versionToSchemaCache.ToMap() { + key := keyValue.(subjectVersion) + if key.subject == subject { + c.versionToSchemaCache.Delete(key) } - c.versionToSchemaCacheLock.Unlock() } + c.versionToSchemaCacheLock.Unlock() c.idToSchemaCacheLock.Lock() for keyValue := range c.idToSchemaCache.ToMap() { key := keyValue.(subjectID) @@ -556,15 +554,13 @@ func (c *client) DeleteSubjectVersion(subject string, version int, permanent boo } } c.schemaToVersionCacheLock.Unlock() - if permanent { - c.versionToSchemaCacheLock.Lock() - cacheKey := subjectVersion{ - subject: subject, - version: version, - } - c.versionToSchemaCache.Delete(cacheKey) - c.versionToSchemaCacheLock.Unlock() + c.versionToSchemaCacheLock.Lock() + cacheKey := subjectVersion{ + subject: subject, + version: version, } + c.versionToSchemaCache.Delete(cacheKey) + c.versionToSchemaCacheLock.Unlock() var result int err = c.restService.handleRequest(newRequest("DELETE", versionsDelete, nil, url.PathEscape(subject), version, permanent), &result) return result, err