Skip to content

Commit

Permalink
Merge pull request #48 from dstendardi/do-not-disable-cache-when-fetc…
Browse files Browse the repository at this point in the history
…hing-schema

Do not disable cache when fetching a schema
  • Loading branch information
riferrei authored Aug 9, 2021
2 parents d08dc22 + d3e14b9 commit 663329a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 35 deletions.
16 changes: 0 additions & 16 deletions schemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,8 @@ func (client *SchemaRegistryClient) GetSchema(schemaID int) (*Schema, error) {
// GetLatestSchema gets the schema associated with the given subject.
// The schema returned contains the last version for that subject.
func (client *SchemaRegistryClient) GetLatestSchema(subject string, isKey bool) (*Schema, error) {

// In order to ensure consistency, we need
// to temporarily disable caching to force
// the retrieval of the latest release from
// Schema Registry.
cachingEnabled := client.getCachingEnabled()
client.CachingEnabled(false)
concreteSubject := getConcreteSubject(subject, isKey)
schema, err := client.getVersion(concreteSubject, "latest")
client.CachingEnabled(cachingEnabled)

return schema, err
}
Expand All @@ -204,15 +196,7 @@ func (client *SchemaRegistryClient) GetLatestSchema(subject string, isKey bool)
// '-value' or '-key' is not appended to the subject
// The schema returned contains the last version for that subject.
func (client *SchemaRegistryClient) GetLatestSchemaWithArbitrarySubject(subject string) (*Schema, error) {

// In order to ensure consistency, we need
// to temporarily disable caching to force
// the retrieval of the latest release from
// Schema Registry.
cachingEnabled := client.getCachingEnabled()
client.CachingEnabled(false)
schema, err := client.getVersion(subject, "latest")
client.CachingEnabled(cachingEnabled)

return schema, err
}
Expand Down
95 changes: 76 additions & 19 deletions schemaRegistryClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -146,36 +147,92 @@ func TestSchemaRegistryClient_GetSchemaByVersionWithArbitrarySubjectWithReferenc
}

func TestSchemaRegistryClient_GetSchemaByVersionWithArbitrarySubjectWithoutReferences(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
responsePayload := schemaResponse{
Subject: "test1",
Version: 1,
Schema: "payload",
ID: 1,
References: nil,
}
response, _ := json.Marshal(responsePayload)

switch req.URL.String() {
case "/subjects/test1/versions/1":
// Send response to be tested
rw.Write(response)
default:
require.Fail(t, "unhandled request")
}

}))
server, call := mockServerWithSchemaResponse(t,"test1", "1", schemaResponse{
Subject: "test1",
Version: 1,
Schema: "payload",
ID: 1,
References: nil,
})

srClient := CreateSchemaRegistryClient(server.URL)
srClient.CodecCreationEnabled(false)
schema, err := srClient.GetSchemaByVersionWithArbitrarySubject("test1", 1)

// Test response
assert.NoError(t, err)

assert.Equal(t, 1, *call)
assert.Equal(t, schema.ID(), 1)
assert.Nil(t, schema.codec)
assert.Equal(t, schema.Schema(), "payload")
assert.Equal(t, schema.Version(), 1)
assert.Nil(t, schema.References())
assert.Equal(t, len(schema.References()), 0)
}

func TestSchemaRegistryClient_GetSchemaByVersionWithArbitrarySubjectReturnsValueFromCache(t *testing.T) {
server, call := mockServerWithSchemaResponse(t,"test1", "1", schemaResponse{
Subject: "test1",
Version: 1,
Schema: "payload",
ID: 1,
References: nil,
})

srClient := CreateSchemaRegistryClient(server.URL)
schema1, err := srClient.GetSchemaByVersionWithArbitrarySubject("test1", 1)

// Test response
assert.NoError(t, err)

// When called twice
schema2, err := srClient.GetSchemaByVersionWithArbitrarySubject("test1", 1)

assert.NoError(t, err)
assert.Equal(t, 1, *call)
assert.Equal(t, schema1, schema2)
}

func TestSchemaRegistryClient_GetLatestSchemaReturnsValueFromCache(t *testing.T) {
server, call := mockServerWithSchemaResponse(t,"test1-value", "latest", schemaResponse{
Subject: "test1",
Version: 1,
Schema: "payload",
ID: 1,
References: nil,
})

srClient := CreateSchemaRegistryClient(server.URL)
schema1, err := srClient.GetLatestSchema("test1", false)

// Test response
assert.NoError(t, err)

// When called twice
schema2, err := srClient.GetLatestSchema("test1", false)

assert.NoError(t, err)
assert.Equal(t, 1, *call)
assert.Equal(t, schema1, schema2)
}


func mockServerWithSchemaResponse(t *testing.T, subject string, version string, schemaResponse schemaResponse) (*httptest.Server, *int) {
var count int
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
count++
response, _ := json.Marshal(schemaResponse)

switch req.URL.String() {
case fmt.Sprintf("/subjects/%s/versions/%s", subject, version):
// Send response to be tested
_, err := rw.Write(response)
if err != nil {
t.Errorf("could not write response %s", err)
}
default:
require.Fail(t, "unhandled request")
}
})), &count
}

0 comments on commit 663329a

Please sign in to comment.