Skip to content

Commit

Permalink
Remove IsKeys and Arbitrary Functions, Expose Http Client (#49)
Browse files Browse the repository at this point in the history
* Remove isKeys and arbitrary functions

* Add CreateSchemaRegistryClientWithOptions
  • Loading branch information
AtakanColak committed Aug 10, 2021
1 parent a92778f commit f62b456
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 303 deletions.
59 changes: 7 additions & 52 deletions mockSchemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,7 @@ we set this schema as the first version of the subject and store it in memory.
Note that there is no enforcement of schema compatibility, any schema goes for all subjects.
*/
func (mck MockSchemaRegistryClient) CreateSchema(subject string, schema string, schemaType SchemaType, isKey bool, references ...Reference) (*Schema, error) {
concreteSubject := getConcreteSubject(subject, isKey)

return mck.CreateSchemaWithArbitrarySubject(concreteSubject, schema, schemaType, references...)
}

/*
Mock Schema creation and registration. CreateSchema behaves in two possible ways according to the scenario:
1. The schema being registered is for an already existing `concrete subject`. In that case,
we increase our schemaID counter and register the schema under that subject in memory.
2. The schema being registered is for a previously unknown `concrete subject`. In that case,
we set this schema as the first version of the subject and store it in memory.
Note that there is no enforcement of schema compatibility, any schema goes for all subjects.
*/
func (mck MockSchemaRegistryClient) CreateSchemaWithArbitrarySubject(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error) {
func (mck MockSchemaRegistryClient) CreateSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error) {
switch schemaType {
case Avro, Json:
compiledRegex := regexp.MustCompile(`\r?\n`)
Expand Down Expand Up @@ -116,30 +101,14 @@ func (mck MockSchemaRegistryClient) GetSchema(schemaID int) (*Schema, error) {
}

// Returns the highest ordinal version of a Schema for a given `concrete subject`
func (mck MockSchemaRegistryClient) GetLatestSchema(subject string, isKey bool) (*Schema, error) {
versions, getSchemaVersionErr := mck.GetSchemaVersions(subject, isKey)
func (mck MockSchemaRegistryClient) GetLatestSchema(subject string) (*Schema, error) {
versions, getSchemaVersionErr := mck.GetSchemaVersions(subject)
if getSchemaVersionErr != nil {
return nil, getSchemaVersionErr
}

latestVersion := versions[len(versions)-1]
thisSchema, err := mck.GetSchemaByVersion(subject, latestVersion, isKey)
if err != nil {
return nil, err
}

return thisSchema, nil
}

// Returns the highest ordinal version of a Schema for a given `concrete subject`
func (mck MockSchemaRegistryClient) GetLatestSchemaWithArbitrarySubject(subject string) (*Schema, error) {
versions, getSchemaVersionErr := mck.GetSchemaVersionsWithArbitrarySubject(subject)
if getSchemaVersionErr != nil {
return nil, getSchemaVersionErr
}

latestVersion := versions[len(versions)-1]
thisSchema, err := mck.GetSchemaByVersionWithArbitrarySubject(subject, latestVersion)
thisSchema, err := mck.GetSchemaByVersion(subject, latestVersion)
if err != nil {
return nil, err
}
Expand All @@ -148,27 +117,13 @@ func (mck MockSchemaRegistryClient) GetLatestSchemaWithArbitrarySubject(subject
}

// Returns the array of versions this subject has previously registered
func (mck MockSchemaRegistryClient) GetSchemaVersions(subject string, isKey bool) ([]int, error) {
concreteSubject := getConcreteSubject(subject, isKey)
versions := mck.allVersions(concreteSubject)
return versions, nil
}

// Returns the array of versions this subject has previously registered
func (mck MockSchemaRegistryClient) GetSchemaVersionsWithArbitrarySubject(subject string) ([]int, error) {
func (mck MockSchemaRegistryClient) GetSchemaVersions(subject string) ([]int, error) {
versions := mck.allVersions(subject)
return versions, nil
}

// Returns the given Schema according to the passed in subject and version number
func (mck MockSchemaRegistryClient) GetSchemaByVersion(subject string, version int, isKey bool) (*Schema, error) {
concreteSubject := getConcreteSubject(subject, isKey)

return mck.GetSchemaByVersionWithArbitrarySubject(concreteSubject, version)
}

// Returns the given Schema according to the passed in subject and version number
func (mck MockSchemaRegistryClient) GetSchemaByVersionWithArbitrarySubject(subject string, version int) (*Schema, error) {
func (mck MockSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*Schema, error) {
schema := &Schema{}
schemaVersionMap, ok := mck.schemaCache[subject]
if !ok {
Expand Down Expand Up @@ -231,7 +186,7 @@ func (mck MockSchemaRegistryClient) CodecCreationEnabled(value bool) {
// Nothing because codecs do not matter in the inMem storage of schemas
}

func (mck MockSchemaRegistryClient) IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, isKey bool) (bool, error) {
func (mck MockSchemaRegistryClient) IsSchemaCompatible(subject, schema, version string, schemaType SchemaType) (bool, error) {
return false, errors.New("mock schema registry client can't check for schema compatibility")
}

Expand Down
45 changes: 17 additions & 28 deletions mockSchemaRegistryClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ func init() {
srClient = CreateMockSchemaRegistryClient("mock://testingUrl")

// Test Schema and Value Schema creation
_, _ = srClient.CreateSchema("test1", schema, Avro, false)
_, _ = srClient.CreateSchema("test1", schema, Avro, true)
_, _ = srClient.CreateSchema("test1-value", schema, Avro)
_, _ = srClient.CreateSchema("test1-key", schema, Avro)
// Test version upgrades for key and value and more registration
_, _ = srClient.CreateSchema("test1", schema2, Avro, false)
_, _ = srClient.CreateSchema("test1", schema2, Avro, true)
_, _ = srClient.CreateSchema("test1-value", schema2, Avro)
_, _ = srClient.CreateSchema("test1-key", schema2, Avro)

// Test version upgrades for key and value and more registration (arbitrary subject)
_, _ = srClient.CreateSchemaWithArbitrarySubject("test1_arb", schema3, Avro)
_, _ = srClient.CreateSchemaWithArbitrarySubject("test1_arb", schema4, Avro)
_, _ = srClient.CreateSchema("test1_arb", schema3, Avro)
_, _ = srClient.CreateSchema("test1_arb", schema4, Avro)
}

func TestMockSchemaRegistryClient_CreateSchema(t *testing.T) {
Expand All @@ -82,14 +82,6 @@ func TestMockSchemaRegistryClient_CreateSchema(t *testing.T) {
schemaReg4, _ := srClient.GetSchema(4)
assert.Equal(t, schema2, schemaReg4.schema)
assert.Equal(t, 2, schemaReg4.version)

// Test registering already registered schema
_, err := srClient.CreateSchema("test1", schema, Avro, true)
assert.EqualError(t, err, "POST \"mock://testingUrl/subjects/test1-key/versions\": Schema already registered with id 2")
}

func TestMockSchemaRegistryClient_CreateSchema_ArbitrarySubject(t *testing.T) {

schemaReg5, _ := srClient.GetSchema(5)
assert.Equal(t, schema3, schemaReg5.schema)
assert.Equal(t, 1, schemaReg5.version)
Expand All @@ -98,24 +90,25 @@ func TestMockSchemaRegistryClient_CreateSchema_ArbitrarySubject(t *testing.T) {
assert.Equal(t, 2, schemaReg6.version)

// Test registering already registered schema
_, err := srClient.CreateSchemaWithArbitrarySubject("test1_arb", schema3, Avro)
_, err := srClient.CreateSchema("test1-key", schema, Avro)
assert.EqualError(t, err, "POST \"mock://testingUrl/subjects/test1-key/versions\": Schema already registered with id 2")

// Test registering already registered schema
_, err = srClient.CreateSchema("test1_arb", schema3, Avro)
assert.EqualError(t, err, "POST \"mock://testingUrl/subjects/test1_arb/versions\": Schema already registered with id 5")
}

func TestMockSchemaRegistryClient_GetLatestSchema(t *testing.T) {

latest, err := srClient.GetLatestSchema("test1", true)
latest, err := srClient.GetLatestSchema("test1-key")
if err != nil {
fmt.Println(err.Error())
t.Fail()
} else {
assert.Equal(t, schema2, latest.schema)
}
}

func TestMockSchemaRegistryClient_GetLatestSchema_ArbitrarySubject(t *testing.T) {

latest, err := srClient.GetLatestSchemaWithArbitrarySubject("test1_arb")
latest, err = srClient.GetLatestSchema("test1_arb")
if err != nil {
fmt.Println(err.Error())
t.Fail()
Expand All @@ -125,22 +118,18 @@ func TestMockSchemaRegistryClient_GetLatestSchema_ArbitrarySubject(t *testing.T)
}

func TestMockSchemaRegistryClient_GetSchemaVersions(t *testing.T) {
versions, _ := srClient.GetSchemaVersions("test1", true)
versions, _ := srClient.GetSchemaVersions("test1-key")
assert.Equal(t, 2, len(versions))
}

func TestMockSchemaRegistryClient_GetSchemaVersions_ArbitrarySubject(t *testing.T) {
versions, _ := srClient.GetSchemaVersionsWithArbitrarySubject("test1_arb")
versions, _ = srClient.GetSchemaVersions("test1_arb")
assert.Equal(t, 2, len(versions))
}

func TestMockSchemaRegistryClient_GetSchemaByVersion(t *testing.T) {
oldVersion, _ := srClient.GetSchemaByVersion("test1", 1, false)
oldVersion, _ := srClient.GetSchemaByVersion("test1-value", 1)
assert.Equal(t, schema, oldVersion.schema)
}

func TestMockSchemaRegistryClient_GetSchemaByVersion_ArbitrarySubject(t *testing.T) {
oldVersion, _ := srClient.GetSchemaByVersionWithArbitrarySubject("test1_arb", 1)
oldVersion, _ = srClient.GetSchemaByVersion("test1_arb", 1)
assert.Equal(t, schema3, oldVersion.schema)
}

Expand Down
90 changes: 18 additions & 72 deletions schemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,16 @@ import (
type ISchemaRegistryClient interface {
GetSubjects() ([]string, error)
GetSchema(schemaID int) (*Schema, error)
GetLatestSchema(subject string, isKey bool) (*Schema, error)
GetLatestSchemaWithArbitrarySubject(subject string) (*Schema, error)
GetSchemaVersions(subject string, isKey bool) ([]int, error)
GetSchemaVersionsWithArbitrarySubject(subject string) ([]int, error)
GetSchemaByVersion(subject string, version int, isKey bool) (*Schema, error)
GetSchemaByVersionWithArbitrarySubject(subject string, version int) (*Schema, error)
CreateSchema(subject string, schema string, schemaType SchemaType, isKey bool, references ...Reference) (*Schema, error)
CreateSchemaWithArbitrarySubject(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)
GetLatestSchema(subject string) (*Schema, error)
GetSchemaVersions(subject string) ([]int, error)
GetSchemaByVersion(subject string, version int) (*Schema, error)
CreateSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)
DeleteSubject(subject string, permanent bool) error
SetCredentials(username string, password string)
SetTimeout(timeout time.Duration)
CachingEnabled(value bool)
CodecCreationEnabled(value bool)
IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, isKey bool) (bool, error)
IsSchemaCompatible(subject, schema, version string, schemaType SchemaType) (bool, error)
}

// SchemaRegistryClient allows interactions with
Expand Down Expand Up @@ -128,14 +124,19 @@ const (
// using this client can retrieve data about schemas, which
// in turn can be used to serialize and deserialize records.
func CreateSchemaRegistryClient(schemaRegistryURL string) *SchemaRegistryClient {
return CreateSchemaRegistryClientWithOptions(schemaRegistryURL, &http.Client{Timeout: 5 * time.Second}, 16)
}

// CreateSchemaRegistryClientWithOptions provides the ability to pass the http.Client to be used, as well as the semaphoreWeight for concurrent requests
func CreateSchemaRegistryClientWithOptions(schemaRegistryURL string, client *http.Client, semaphoreWeight int) *SchemaRegistryClient {
return &SchemaRegistryClient{
schemaRegistryURL: schemaRegistryURL,
httpClient: &http.Client{Timeout: 5 * time.Second},
httpClient: client,
cachingEnabled: true,
codecCreationEnabled: false,
idSchemaCache: make(map[int]*Schema),
subjectSchemaCache: make(map[string]*Schema),
sem: semaphore.NewWeighted(16),
sem: semaphore.NewWeighted(int64(semaphoreWeight)),
}
}

Expand Down Expand Up @@ -185,31 +186,12 @@ func (client *SchemaRegistryClient) GetSchema(schemaID int) (*Schema, error) {

// GetLatestSchema gets the schema associated with the given subject.
// The schema returned contains the last version for that subject.
func (client *SchemaRegistryClient) GetLatestSchema(subject string, isKey bool) (*Schema, error) {
concreteSubject := getConcreteSubject(subject, isKey)
schema, err := client.getVersion(concreteSubject, "latest")

return schema, err
}

// GetLatestSchemaWithArbitrarySubject gets the schema associated with the given subject.
// '-value' or '-key' is not appended to the subject
// The schema returned contains the last version for that subject.
func (client *SchemaRegistryClient) GetLatestSchemaWithArbitrarySubject(subject string) (*Schema, error) {
schema, err := client.getVersion(subject, "latest")

return schema, err
func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*Schema, error) {
return client.getVersion(subject, "latest")
}

// GetSchemaVersions returns a list of versions from a given subject.
func (client *SchemaRegistryClient) GetSchemaVersions(subject string, isKey bool) ([]int, error) {
concreteSubject := getConcreteSubject(subject, isKey)

return client.GetSchemaVersionsWithArbitrarySubject(concreteSubject)
}

// GetSchemaVersionsWithArbitrarySubject returns a list of versions from a given subject (without appending '-key' or '-value').
func (client *SchemaRegistryClient) GetSchemaVersionsWithArbitrarySubject(subject string) ([]int, error) {
func (client *SchemaRegistryClient) GetSchemaVersions(subject string) ([]int, error) {
resp, err := client.httpRequest("GET", fmt.Sprintf(subjectVersions, subject), nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -240,33 +222,14 @@ func (client *SchemaRegistryClient) GetSubjects() ([]string, error) {

// GetSchemaByVersion gets the schema associated with the given subject.
// The schema returned contains the version specified as a parameter.
func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int, isKey bool) (*Schema, error) {
concreteSubject := getConcreteSubject(subject, isKey)

return client.GetSchemaByVersionWithArbitrarySubject(concreteSubject, version)
}

// GetSchemaByVersion gets the schema associated with the given subject.
// The schema returned contains the version specified as a parameter.
func (client *SchemaRegistryClient) GetSchemaByVersionWithArbitrarySubject(subject string, version int) (*Schema, error) {
func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*Schema, error) {
return client.getVersion(subject, strconv.Itoa(version))
}

// CreateSchema creates a new schema in Schema Registry and associates
// with the subject provided. It returns the newly created schema with
// all its associated information.
func (client *SchemaRegistryClient) CreateSchema(subject string, schema string,
schemaType SchemaType, isKey bool, references ...Reference) (*Schema, error) {

concreteSubject := getConcreteSubject(subject, isKey)

return client.CreateSchemaWithArbitrarySubject(concreteSubject, schema, schemaType, references...)
}

// CreateSchemaWithArbitrarySubject creates a new schema in Schema Registry and associates
// with the subject provided (without appending '-value' or '-key'). It returns the newly created schema with
// all its associated information.
func (client *SchemaRegistryClient) CreateSchemaWithArbitrarySubject(subject string, schema string,
schemaType SchemaType, references ...Reference) (*Schema, error) {
switch schemaType {
case Avro, Json:
Expand Down Expand Up @@ -305,7 +268,7 @@ func (client *SchemaRegistryClient) CreateSchemaWithArbitrarySubject(subject str
// this logic strongly relies on the idempotent guarantees
// from Schema Registry, as well as in the best practice
// that schemas don't change very often.
newSchema, err := client.GetLatestSchemaWithArbitrarySubject(subject)
newSchema, err := client.GetLatestSchema(subject)
if err != nil {
return nil, err
}
Expand All @@ -331,15 +294,7 @@ func (client *SchemaRegistryClient) CreateSchemaWithArbitrarySubject(subject str

// IsSchemaCompatible checks if the given schema is compatible with the given subject and version
// valid versions are versionID and "latest"
func (client *SchemaRegistryClient) IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, isKey bool) (bool, error) {
concreteSubject := getConcreteSubject(subject, isKey)

return client.IsSchemaWithArbitrarySubjectCompatible(concreteSubject, schema, version, schemaType)
}

// IsSchemaWithArbitrarySubjectCompatible checks if the given schema is compatible with the given subject and version (without appending '-value' or '-key')
// valid versions are versionID and "latest"
func (client *SchemaRegistryClient) IsSchemaWithArbitrarySubjectCompatible(subject, schema, version string, schemaType SchemaType) (bool, error) {
func (client *SchemaRegistryClient) IsSchemaCompatible(subject, schema, version string, schemaType SchemaType) (bool, error) {
schemaReq := schemaRequest{Schema: schema, SchemaType: schemaType.String(), References: make([]Reference, 0)}
schemaReqBytes, err := json.Marshal(schemaReq)
if err != nil {
Expand Down Expand Up @@ -542,15 +497,6 @@ func cacheKey(subject string, version string) string {
return fmt.Sprintf("%s-%s", subject, version)
}

func getConcreteSubject(subject string, isKey bool) string {
if isKey {
subject = fmt.Sprintf("%s-key", subject)
} else {
subject = fmt.Sprintf("%s-value", subject)
}
return subject
}

func createError(resp *http.Response) error {
decoder := json.NewDecoder(resp.Body)
var errorResp struct {
Expand Down
Loading

0 comments on commit f62b456

Please sign in to comment.