From 0b75be5ff31d1390a623182f34162708e1a9306e Mon Sep 17 00:00:00 2001 From: Tim Middleton Date: Thu, 5 Jun 2025 11:38:08 +0800 Subject: [PATCH 1/3] Add coherence state store component - updated Signed-off-by: Tim Middleton --- .../docker-compose-coherence.yml | 13 + go.mod | 1 + go.sum | 5 +- state/coherence/coherence.go | 435 ++++++++++++++++++ state/coherence/coherence_test.go | 232 ++++++++++ state/coherence/metadata.yaml | 72 +++ tests/certification/go.mod | 1 + tests/certification/go.sum | 6 +- tests/certification/state/coherence/README.md | 45 ++ .../state/coherence/coherence_test.go | 310 +++++++++++++ .../docker/default/coherencestatestore.yaml | 12 + .../nearcachememory/coherencestatestore.yaml | 14 + .../nearcachettl/coherencestatestore.yaml | 14 + .../coherencestatestore.yaml | 16 + .../coherencestatestore.yaml | 16 + .../nearcacheunits/coherencestatestore.yaml | 14 + .../docker/scope/coherencestatestore.yaml | 14 + .../certification/state/coherence/config.yaml | 6 + .../state/coherence/docker-compose.yaml | 13 + tests/config/state/coherence/statestore.yaml | 10 + tests/config/state/tests.yml | 2 + tests/conformance/state_test.go | 3 + 22 files changed, 1251 insertions(+), 3 deletions(-) create mode 100644 .github/infrastructure/docker-compose-coherence.yml create mode 100644 state/coherence/coherence.go create mode 100644 state/coherence/coherence_test.go create mode 100644 state/coherence/metadata.yaml create mode 100644 tests/certification/state/coherence/README.md create mode 100644 tests/certification/state/coherence/coherence_test.go create mode 100644 tests/certification/state/coherence/components/docker/default/coherencestatestore.yaml create mode 100644 tests/certification/state/coherence/components/docker/nearcachememory/coherencestatestore.yaml create mode 100644 tests/certification/state/coherence/components/docker/nearcachettl/coherencestatestore.yaml create mode 100644 tests/certification/state/coherence/components/docker/nearcachettlandmemory/coherencestatestore.yaml create mode 100644 tests/certification/state/coherence/components/docker/nearcachettlandunits/coherencestatestore.yaml create mode 100644 tests/certification/state/coherence/components/docker/nearcacheunits/coherencestatestore.yaml create mode 100644 tests/certification/state/coherence/components/docker/scope/coherencestatestore.yaml create mode 100644 tests/certification/state/coherence/config.yaml create mode 100644 tests/certification/state/coherence/docker-compose.yaml create mode 100644 tests/config/state/coherence/statestore.yaml diff --git a/.github/infrastructure/docker-compose-coherence.yml b/.github/infrastructure/docker-compose-coherence.yml new file mode 100644 index 0000000000..de9641cae2 --- /dev/null +++ b/.github/infrastructure/docker-compose-coherence.yml @@ -0,0 +1,13 @@ +services: + coherence: + image: 'ghcr.io/oracle/coherence-ce:14.1.2-0-2-java17' + environment: + - coherence.management.http=all + - coherence.management.http.port=30000 + - Dcoherence.health.http.port=6676 + - coherence.wka=127.0.0.1 + ports: + - 30000:30000 + - 1408:1408 + - 9612:9612 + - 6676:6676 diff --git a/go.mod b/go.mod index 447e658417..5ce10a44dc 100644 --- a/go.mod +++ b/go.mod @@ -98,6 +98,7 @@ require ( github.com/nats-io/nats.go v1.31.0 github.com/nats-io/nkeys v0.4.6 github.com/open-policy-agent/opa v1.4.2 + github.com/oracle/coherence-go-client/v2 v2.2.0 github.com/oracle/oci-go-sdk/v54 v54.0.0 github.com/pashagolub/pgxmock/v2 v2.12.0 github.com/patrickmn/go-cache v2.1.0+incompatible diff --git a/go.sum b/go.sum index a0b5adfa27..506a3a4f62 100644 --- a/go.sum +++ b/go.sum @@ -1361,8 +1361,9 @@ github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmv github.com/onsi/gomega v1.26.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.27.1/go.mod h1:aHX5xOykVYzWOV4WqQy0sy8BQptgukenXpCXfadcIAw= github.com/onsi/gomega v1.27.3/go.mod h1:5vG284IBtfDAmDyrK+eGyZmUgUlmi+Wngqo557cZ6Gw= -github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E= github.com/onsi/gomega v1.27.4/go.mod h1:riYq/GJKh8hhoM01HN6Vmuy93AarCXCBGpvFDK3q3fQ= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/open-policy-agent/opa v1.4.2 h1:ag4upP7zMsa4WE2p1pwAFeG4Pn3mNwfAx9DLhhJfbjU= github.com/open-policy-agent/opa v1.4.2/go.mod h1:DNzZPKqKh4U0n0ANxcCVlw8lCSv2c+h5G/3QvSYdWZ8= @@ -1380,6 +1381,8 @@ github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxS github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= +github.com/oracle/coherence-go-client/v2 v2.2.0 h1:ZO8tsN8Z4JTFGnoERez+rYYlQAyH7g1MwSY5THG7c+o= +github.com/oracle/coherence-go-client/v2 v2.2.0/go.mod h1:IUOIVsyaeccST2AZa/F3/PpY8uukF5Sy3Ko79pqleO0= github.com/oracle/oci-go-sdk/v54 v54.0.0 h1:CDLjeSejv2aDpElAJrhKpi6zvT/zhZCZuXchUUZ+LS4= github.com/oracle/oci-go-sdk/v54 v54.0.0/go.mod h1:+t+yvcFGVp+3ZnztnyxqXfQDsMlq8U25faBLa+mqCMc= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= diff --git a/state/coherence/coherence.go b/state/coherence/coherence.go new file mode 100644 index 0000000000..4b7641bd93 --- /dev/null +++ b/state/coherence/coherence.go @@ -0,0 +1,435 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package coherence + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "strconv" + "time" + + coh "github.com/oracle/coherence-go-client/v2/coherence" + "github.com/oracle/coherence-go-client/v2/coherence/filters" + "github.com/oracle/coherence-go-client/v2/coherence/processors" + + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + stateutils "github.com/dapr/components-contrib/state/utils" + "github.com/dapr/kit/logger" + kitmd "github.com/dapr/kit/metadata" +) + +const ( + serverAddressConfig = "serverAddress" + tlsEnabledConfig = "tlsEnabled" + tlsClientCertPathConfig = "tlsClientCertPath" + tlsClientKeyConfig = "tlsClientKey" + tlsCertsPathConfig = "tlsCertsPath" + ignoreInvalidCerts = "ignoreInvalidCerts" + requestTimeoutConfig = "requestTimeout" + nearCacheTTLConfig = "nearCacheTTL" + nearCacheUnitsConfig = "nearCacheUnits" + nearCacheMemoryConfig = "nearCacheMemory" + scopeNameConfig = "scopeName" + defaultScopeNameConfig = "default" +) + +var errTrueOrFalse = errors.New("value should be true or false") + +type Coherence struct { + state.BulkStore + + session *coh.Session + namedCache coh.NamedCache[string, []byte] + logger logger.Logger +} + +type coherenceMetadata struct { + ServerAddress string `json:"serverAddress"` + TLSEnabled bool `json:"tlsEnabled"` + TLSClientCertPath string `json:"tlsClientCertPathConfig"` + TLSClientKey string `json:"tlsClientKey"` + TLSCertsPath string `json:"tlsCertsPath"` + IgnoreInvalidCerts bool `json:"ignoreInvalidCerts"` + RequestTimeout time.Duration `json:"requestTimeout"` + NearCacheTTL time.Duration `json:"nearCacheTimeout"` + NearCacheUnits int64 `json:"nearCacheUnits"` + NearCacheMemory int64 `json:"nearCacheMemory"` + ScopeName string `json:"scopeName"` +} + +// NewCoherenceStateStore returns a new Coherence state store. +func NewCoherenceStateStore(logger logger.Logger) state.Store { + c := &Coherence{logger: logger} + c.BulkStore = state.NewDefaultBulkStore(c) + return c +} + +func (c *Coherence) Init(_ context.Context, metadata state.Metadata) error { + meta, err := retrieveCoherenceMetadata(metadata) + if err != nil { + return err + } + + options := make([]func(session *coh.SessionOptions), 0) + + options = append(options, coh.WithAddress(meta.ServerAddress)) + + // configure TLS and options + if !meta.TLSEnabled { + options = append(options, coh.WithPlainText()) + } + + if meta.TLSClientCertPath != "" { + options = append(options, coh.WithTLSClientCert(meta.TLSClientCertPath)) + } + if meta.TLSCertsPath != "" { + options = append(options, coh.WithTLSCertsPath(meta.TLSCertsPath)) + } + if meta.TLSClientKey != "" { + options = append(options, coh.WithTLSClientKey(meta.TLSClientKey)) + } + if meta.IgnoreInvalidCerts { + options = append(options, coh.WithIgnoreInvalidCerts()) + } + + options = append(options, coh.WithRequestTimeout(meta.RequestTimeout)) + + // create the Coherence session + session, err := coh.NewSession(context.Background(), options...) + if err != nil { + return err + } + + c.logger.Info("Created session", session) + + cacheOptions := make([]func(session *coh.CacheOptions), 0) + // create the cache and configure a near cache if the nearCacheTimeout is set + if meta.NearCacheTTL != 0 || meta.NearCacheUnits != 0 || meta.NearCacheMemory != 0 { + nearCacheOptions := coh.NearCacheOptions{TTL: meta.NearCacheTTL, HighUnits: meta.NearCacheUnits, HighUnitsMemory: meta.NearCacheMemory} + cacheOptions = append(cacheOptions, coh.WithNearCache(&nearCacheOptions)) + } + + nc, err := coh.GetNamedCache[string, []byte](session, "dapr$"+meta.ScopeName, cacheOptions...) + if err != nil { + return err + } + + c.namedCache = nc + c.session = session + + c.logger.Info("Using cache", nc.Name()) + + return nil +} + +func retrieveCoherenceMetadata(meta state.Metadata) (*coherenceMetadata, error) { + c := coherenceMetadata{ + ServerAddress: "localhost:1408", + TLSEnabled: false, + RequestTimeout: time.Duration(30) * time.Second, + NearCacheTTL: 0, + ScopeName: defaultScopeNameConfig, + } + + var ( + duration time.Duration + err error + booleanVal bool + int64Val int64 + ) + + err = kitmd.DecodeMetadata(meta.Properties, &c) + if err != nil { + return nil, err + } + + // retrieve the server address + val := getStringMetaProperty(meta, serverAddressConfig) + if val != "" { + c.ServerAddress = val + } + + // retrieve the scope name + val = getStringMetaProperty(meta, scopeNameConfig) + if val != "" { + c.ScopeName = val + } + + // retrieve the request timeout in millis + duration, err = getDurationMetaProperty(meta, requestTimeoutConfig) + if err != nil { + return nil, err + } + if duration > 0 { + c.RequestTimeout = duration + } + + // retrieve the near cache timeout and values + duration, err = getDurationMetaProperty(meta, nearCacheTTLConfig) + if err != nil { + return nil, err + } + c.NearCacheTTL = duration + + nearCacheUnits := getStringMetaProperty(meta, nearCacheUnitsConfig) + nearCacheMemory := getStringMetaProperty(meta, nearCacheMemoryConfig) + + if nearCacheUnits != "" && nearCacheMemory != "" { + return nil, errors.New(nearCacheUnitsConfig + " and " + nearCacheMemoryConfig + "are mutually exclusive") + } + + if nearCacheUnits != "" { + int64Val, err = getInt64ValueFromString(nearCacheUnits) + if err != nil { + return nil, err + } + if int64Val > 0 { + c.NearCacheUnits = int64Val + } + } + + if nearCacheMemory != "" { + int64Val, err = getInt64ValueFromString(nearCacheMemory) + if err != nil { + return nil, err + } + if int64Val > 0 { + c.NearCacheMemory = int64Val + } + } + + // validate if TLS is enabled + booleanVal, err = getBoolMetaProperty(meta, tlsEnabledConfig) + if err != nil { + return nil, errors.New(tlsEnabledConfig + " should be true or false") + } + c.TLSEnabled = booleanVal + + booleanVal, err = getBoolMetaProperty(meta, ignoreInvalidCerts) + if err != nil { + return nil, errors.New(ignoreInvalidCerts + " should be true or false") + } + c.IgnoreInvalidCerts = booleanVal + + c.TLSClientKey = getStringMetaProperty(meta, tlsClientKeyConfig) + c.TLSCertsPath = getStringMetaProperty(meta, tlsCertsPathConfig) + c.TLSClientCertPath = getStringMetaProperty(meta, tlsClientCertPathConfig) + + if c.TLSEnabled { + // do extra validation + if c.TLSClientCertPath == "" || c.TLSClientKey == "" { + return nil, errors.New(tlsClientCertPathConfig + " and " + tlsClientCertPathConfig + " must be set when TLS is enabled") + } + } + + return &c, nil +} + +func getStringMetaProperty(meta state.Metadata, key string) string { + if val, ok := meta.Properties[key]; ok && val != "" { + return val + } + return "" +} + +func getInt64ValueFromString(value string) (int64, error) { + result, err := strconv.Atoi(value) + if err != nil { + return 0, fmt.Errorf("invalid value for %s: %w", value, err) + } + return int64(result), nil +} + +func getBoolMetaProperty(meta state.Metadata, key string) (bool, error) { + if val, ok := meta.Properties[key]; ok && val != "" { + if val != "true" && val != "false" { + return false, errTrueOrFalse + } + return val == "true", nil + } + return false, nil // default of "" is false +} + +func getDurationMetaProperty(meta state.Metadata, key string) (time.Duration, error) { + if val, ok := meta.Properties[key]; ok && val != "" { + t, err := time.ParseDuration(val) + if err != nil { + return 0, fmt.Errorf("could not parse %s value of %s - %v", key, val, err) + } + return t, nil + } + return 0, nil +} + +func (c *Coherence) BulkGet(ctx context.Context, req []state.GetRequest, _ state.BulkGetOpts) ([]state.BulkGetResponse, error) { + var ( + err error + responses = make([]state.BulkGetResponse, 0) + foundKeys = make([]string, 0) + ) + err = state.CheckRequestOptions(req) + if err != nil { + return responses, err + } + + keys := make([]string, 0, len(req)) + for _, k := range req { + keys = append(keys, k.Key) + } + + ch := c.namedCache.GetAll(ctx, keys) + for entry := range ch { + if entry.Err != nil { + return responses, entry.Err + } + responses = append(responses, state.BulkGetResponse{Key: entry.Key, Data: entry.Value}) + foundKeys = append(foundKeys, entry.Key) + } + + // note: when a get is done and the key is not there, we should have an entry with a nil value + foundKeySet := make(map[string]bool, len(foundKeys)) + for _, k := range foundKeys { + foundKeySet[k] = true + } + + for _, k := range keys { + if !foundKeySet[k] { + responses = append(responses, state.BulkGetResponse{Key: k, Data: nil}) + } + } + + return responses, nil +} + +func (c *Coherence) BulkSet(ctx context.Context, req []state.SetRequest, _ state.BulkStoreOpts) error { + var ( + err error + bytesValue []byte + ) + + err = state.CheckRequestOptions(req) + if err != nil { + return err + } + + buffer := make(map[string][]byte, len(req)) + + for _, v := range req { + bytesValue, err = stateutils.Marshal(v.Value, json.Marshal) + if err != nil { + return err + } + buffer[v.Key] = bytesValue + } + + return c.namedCache.PutAll(ctx, buffer) +} + +func (c *Coherence) BulkDelete(ctx context.Context, req []state.DeleteRequest, opts state.BulkStoreOpts) error { + var err error + err = state.CheckRequestOptions(req) + if err != nil { + return err + } + + keys := make([]string, 0, len(req)) + for _, k := range req { + keys = append(keys, k.Key) + } + + return coh.InvokeAllKeysBlind[string, []byte](ctx, c.namedCache, keys, processors.ConditionalRemove(filters.Always())) +} + +func (c *Coherence) Ping(ctx context.Context) error { + ready, err := c.namedCache.IsReady(ctx) + if err != nil { + return err + } + + if ready { + return nil + } + return errors.New("coherence IsReady() returned false") +} + +// Features returns the features available in this state store. +func (c *Coherence) Features() []state.Feature { + return []state.Feature{ + state.FeatureTTL, + } +} + +func (c *Coherence) Set(ctx context.Context, req *state.SetRequest) error { + ttl := time.Duration(0) + err := state.CheckRequestOptions(req) + if err != nil { + return err + } + bytesValue, err := stateutils.Marshal(req.Value, json.Marshal) + if err != nil { + return err + } + + // see if we have a TTL specified + ttlInSeconds, err := stateutils.ParseTTL64(req.Metadata) + if err != nil { + return err + } + if ttlInSeconds != nil { + ttl = time.Duration(*ttlInSeconds) * time.Second + } + + _, err = c.namedCache.PutWithExpiry(ctx, req.Key, bytesValue, ttl) + return err +} + +func (c *Coherence) Delete(ctx context.Context, req *state.DeleteRequest) error { + _, err := c.namedCache.Remove(ctx, req.Key) + + return err +} + +func (c *Coherence) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { + v, err := c.namedCache.Get(ctx, req.Key) + if err != nil { + return nil, err + } + + if v == nil { + // nothing returned from Coherence + return &state.GetResponse{}, nil + } + + // serialize the value to byte + return &state.GetResponse{ + Data: *v, + }, nil +} + +func (c *Coherence) Close() (err error) { + if c.session != nil { + c.session.Close() + } + return nil +} + +func (c *Coherence) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { + metadataStruct := coherenceMetadata{} + _ = metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType) + return +} diff --git a/state/coherence/coherence_test.go b/state/coherence/coherence_test.go new file mode 100644 index 0000000000..f6379ff743 --- /dev/null +++ b/state/coherence/coherence_test.go @@ -0,0 +1,232 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package coherence + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" +) + +func TestValidateBaseMetadata(t *testing.T) { + t.Run("no configuration all defaults", func(t *testing.T) { + properties := map[string]string{} + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.False(t, meta.TLSEnabled) + assert.Equal(t, "localhost:1408", meta.ServerAddress) + assert.Equal(t, time.Duration(30)*time.Second, meta.RequestTimeout) + assert.Equal(t, defaultScopeNameConfig, meta.ScopeName) + assert.Equal(t, time.Duration(0), meta.NearCacheTTL) + assert.Equal(t, int64(0), meta.NearCacheUnits) + assert.Equal(t, int64(0), meta.NearCacheMemory) + }) + + t.Run("without valid request timeout", func(t *testing.T) { + properties := map[string]string{ + "requestTimeout": "more rubbish", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) + + t.Run("with valid request timeout", func(t *testing.T) { + properties := map[string]string{ + requestTimeoutConfig: "35s", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.Equal(t, time.Duration(35000)*time.Millisecond, meta.RequestTimeout) + }) + + t.Run("without default request timeout", func(t *testing.T) { + properties := map[string]string{} + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.Equal(t, time.Duration(30)*time.Second, meta.RequestTimeout) + }) +} + +func TestNTLSMetadata(t *testing.T) { + t.Run("without valid tlsEnabled", func(t *testing.T) { + properties := map[string]string{ + tlsEnabledConfig: "rubbish", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) + + t.Run("without valid ignoreInvalidCerts", func(t *testing.T) { + properties := map[string]string{ + ignoreInvalidCerts: "rubbish", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) + + t.Run("with tlsEnabled but no certs", func(t *testing.T) { + properties := map[string]string{ + tlsEnabledConfig: "true", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) + + t.Run("with valid tlsEnabled", func(t *testing.T) { + properties := map[string]string{ + tlsEnabledConfig: "true", + tlsClientKeyConfig: "keyConfig", + tlsClientCertPathConfig: "certConfig", + tlsCertsPathConfig: "certsPath", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.True(t, meta.TLSEnabled) + assert.Equal(t, "keyConfig", meta.TLSClientKey) + assert.Equal(t, "certConfig", meta.TLSClientCertPath) + assert.Equal(t, "certsPath", meta.TLSCertsPath) + }) + + t.Run("with valid false tls", func(t *testing.T) { + properties := map[string]string{ + "tlsEnabled": "false", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.False(t, meta.TLSEnabled) + }) +} + +func TestNearCacheMetadata(t *testing.T) { + t.Run("with valid near cache ttl", func(t *testing.T) { + properties := map[string]string{ + nearCacheTTLConfig: "30s", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.Equal(t, time.Duration(30)*time.Second, meta.NearCacheTTL) + assert.Equal(t, int64(0), meta.NearCacheMemory) + assert.Equal(t, int64(0), meta.NearCacheUnits) + }) + + t.Run("with invalid near cache ttl", func(t *testing.T) { + properties := map[string]string{ + nearCacheTTLConfig: "more rubbish", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) + + t.Run("with invalid units and memory", func(t *testing.T) { + properties := map[string]string{ + nearCacheMemoryConfig: "300000", + nearCacheUnitsConfig: "300000", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) + + t.Run("with valid near cache ttl and units", func(t *testing.T) { + properties := map[string]string{ + nearCacheTTLConfig: "30s", + nearCacheUnitsConfig: "300000", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.Equal(t, time.Duration(30)*time.Second, meta.NearCacheTTL) + assert.Equal(t, int64(300000), meta.NearCacheUnits) + assert.Equal(t, int64(0), meta.NearCacheMemory) + }) + + t.Run("with valid near cache ttl and memory", func(t *testing.T) { + properties := map[string]string{ + nearCacheTTLConfig: "30s", + nearCacheMemoryConfig: "310000", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + meta, err := retrieveCoherenceMetadata(m) + require.NoError(t, err) + assert.Equal(t, time.Duration(30)*time.Second, meta.NearCacheTTL) + assert.Equal(t, int64(310000), meta.NearCacheMemory) + assert.Equal(t, int64(0), meta.NearCacheUnits) + }) + + t.Run("with invalid units", func(t *testing.T) { + properties := map[string]string{ + nearCacheUnitsConfig: "xyz", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) + + t.Run("with invalid memory", func(t *testing.T) { + properties := map[string]string{ + nearCacheMemoryConfig: "abc", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + _, err := retrieveCoherenceMetadata(m) + require.Error(t, err) + }) +} diff --git a/state/coherence/metadata.yaml b/state/coherence/metadata.yaml new file mode 100644 index 0000000000..16ed18d2b0 --- /dev/null +++ b/state/coherence/metadata.yaml @@ -0,0 +1,72 @@ +# yaml-language-server: $schema=../../component-metadata-schema.json +schemaVersion: v1 +type: state +name: coherence +version: v1 +status: alpha +title: "Coherence" +urls: + - title: Reference + url: https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-coherence/ +capabilities: + - crud + - ttl +metadata: + - name: serverAddress + type: string + required: true + description: Coherence gRPC Server Address. + default: "localhost:1408" + example: "localhost:1408" + - name: tlsEnabled + type: bool + required: false + example: "false" + default: "false" + description: Indicates if TLS should be enabled. + - name: tlsClientCertPath + required: false + description: Client certificate path for Coherence. Can be secretKeyRef to use a secret reference + example: "" + type: string + - name: tlsClientKey + required: false + description: Client key for Coherence. Can be secretKeyRef to use a secret reference + example: "" + type: string + - name: tlsCertsPath + required: false + description: Additional certificates for Coherence. Can be secretKeyRef to use a secret reference + example: "" + type: string + - name: ignoreInvalidCerts + type: bool + required: false + example: "false" + default: "false" + description: Indicates if to ignore self-signed certificates for testing only, not to be used in production. + - name: scopeName + type: string + description: A scope name to use for the internal cache. This allows you to set multiple scopes and have different state sessions at a time. + default: "" + example: "scope1" + - name: requestTimeout + type: duration + description: Timeout for calls to the cluster. + default: "30s" + example: "30s" + - name: nearCacheTTL + type: duration + description: If non zero a near cache will be used and the TTL of the near cache will be this value. + default: "0" + example: "30s" + - name: nearCacheUnits + type: number + description: If non zero a near cache will be used and the maximum size of the near cache will be this value in units. + default: "0" + example: "1000" + - name: nearCacheMemory + type: number + description: If non zero a near cache will be used and the maximum size of the near cache will be this value in bytes. + default: "0" + example: "1000" diff --git a/tests/certification/go.mod b/tests/certification/go.mod index 683f91c391..d5aac48b9e 100644 --- a/tests/certification/go.mod +++ b/tests/certification/go.mod @@ -244,6 +244,7 @@ require ( github.com/open-policy-agent/opa v1.4.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect + github.com/oracle/coherence-go-client/v2 v2.2.0 // indirect github.com/panjf2000/ants/v2 v2.8.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect diff --git a/tests/certification/go.sum b/tests/certification/go.sum index 0400ae507b..5f7c98f9ab 100644 --- a/tests/certification/go.sum +++ b/tests/certification/go.sum @@ -1122,8 +1122,8 @@ github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042 github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= -github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/open-policy-agent/opa v1.4.2 h1:ag4upP7zMsa4WE2p1pwAFeG4Pn3mNwfAx9DLhhJfbjU= github.com/open-policy-agent/opa v1.4.2/go.mod h1:DNzZPKqKh4U0n0ANxcCVlw8lCSv2c+h5G/3QvSYdWZ8= @@ -1143,6 +1143,8 @@ github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnh github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7sjsSdg= github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c= +github.com/oracle/coherence-go-client/v2 v2.2.0 h1:ZO8tsN8Z4JTFGnoERez+rYYlQAyH7g1MwSY5THG7c+o= +github.com/oracle/coherence-go-client/v2 v2.2.0/go.mod h1:IUOIVsyaeccST2AZa/F3/PpY8uukF5Sy3Ko79pqleO0= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= diff --git a/tests/certification/state/coherence/README.md b/tests/certification/state/coherence/README.md new file mode 100644 index 0000000000..b514273cc5 --- /dev/null +++ b/tests/certification/state/coherence/README.md @@ -0,0 +1,45 @@ +# Coherence State Store certification testing + +This project aims to test the [Coherence State Store] component under various conditions. + +This state store [supports the following features][features]: +* CRUD +* TTL + +# Test plan + +## Basic Test for CRUD operations: +1. Able to create and test connection. +2. Able to do set, fetch, update and delete. +3. Negative test to fetch record with key, that is not present. + +## Test save or update data with different TTL settings: +1. Provide a TTL of 5 second: + 1. Fetch this record just after saving, should exist + 2. Sleep for 6 seconds + 3. Try to fetch again after a gap of 2 seconds, record shouldn't be found + +## Test Bulk Operations + +1. Able to issue SaveBulkState +2. Able to issue GetBulkState +3. Able to issue DeleteBulkState + +## Out of scope + +1Tests for [features not implemented by Coherence][features] are out of scope. This includes + * Transactional + * ETag + * Actors + * Query + + +# References: + +* [Coherence State Component reference page][Coherence State Store] +* [List of state stores and their features][features] +* [Coherence Go Client API reference](https://pkg.go.dev/github.com/oracle/coherence-go-client/v2/coherence) +* [Coherence Community](https://coherence.community/) + +[Coherence State Store]: https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-coherence/ +[features]: https://docs.dapr.io/reference/components-reference/supported-state-stores/ \ No newline at end of file diff --git a/tests/certification/state/coherence/coherence_test.go b/tests/certification/state/coherence/coherence_test.go new file mode 100644 index 0000000000..dd86f33ed0 --- /dev/null +++ b/tests/certification/state/coherence/coherence_test.go @@ -0,0 +1,310 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package coherence_test + +import ( + "bytes" + "context" + "fmt" + "net/http" + "os" + "strconv" + "testing" + "time" + + "github.com/dapr/components-contrib/state" + "github.com/dapr/go-sdk/client" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + state_coherence "github.com/dapr/components-contrib/state/coherence" + "github.com/dapr/components-contrib/tests/certification/embedded" + "github.com/dapr/components-contrib/tests/certification/flow" + "github.com/dapr/components-contrib/tests/certification/flow/dockercompose" + "github.com/dapr/components-contrib/tests/certification/flow/sidecar" + state_loader "github.com/dapr/dapr/pkg/components/state" + dapr_testing "github.com/dapr/dapr/pkg/testing" + "github.com/dapr/kit/logger" +) + +const ( + dockerComposeClusterYAML = "docker-compose.yaml" + sidecarNamePrefix = "coherence-sidecar-" + stateStoreName = "statestore" + certificationTestPrefix = "stable-certification-" + testKey1 = certificationTestPrefix + "key1" + testKey2 = certificationTestPrefix + "key2" + testKey1Value = "coherenceKey1" + testKey2Value = "coherenceKey2" + testUpdateValue = "coherenceUpdateValue" + testNonexistentKey = "ThisKeyDoesNotExistInTheStateStore" + daprSideCar = sidecarNamePrefix + "dockerClusterDefault" +) + +var ( + gracefulShutdownTimeout = time.Second * 10 + currentT *testing.T + currentGrpcPort int + currentHTTPPort int + + // Basic CRUD tests + basicTestCRUDTests = func(ctx flow.Context) error { + cl, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) + if err != nil { + panic(err) + } + defer cl.Close() + + err = cl.SaveState(ctx, stateStoreName, testKey1, []byte(testKey1Value), nil) + require.NoError(currentT, err) + + err = cl.SaveState(ctx, stateStoreName, testKey2, []byte(testKey2Value), nil) + require.NoError(currentT, err) + + // get state + item, err := cl.GetState(ctx, stateStoreName, testKey1, nil) + require.NoError(currentT, err) + assert.Equal(currentT, testKey1Value, string(item.Value)) + + errUpdate := cl.SaveState(ctx, stateStoreName, testKey1, []byte(testUpdateValue), nil) + require.NoError(currentT, errUpdate) + item, errUpdatedGet := cl.GetState(ctx, stateStoreName, testKey1, nil) + require.NoError(currentT, errUpdatedGet) + assert.Equal(currentT, testUpdateValue, string(item.Value)) + + // delete state + err = cl.DeleteState(ctx, stateStoreName, testKey1, nil) + require.NoError(currentT, err) + + assertNilValue(ctx, currentT, cl, testKey1) + + // nonexistent key + item, err = cl.GetState(ctx, stateStoreName, testNonexistentKey, nil) + require.NoError(currentT, err) + assert.Nil(currentT, nil, item) + + return nil + } + + testTTL = func(ctx flow.Context) error { + cl, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) + if err != nil { + panic(err) + } + defer cl.Close() + + key := certificationTestPrefix + "_expiresInOneSecondKey" + value := "This key will expire in 5 second" + + ttlExpirationTime := 5 * time.Second + ttlInSeconds := int(ttlExpirationTime.Seconds()) + mapOptionsExpiringKey := map[string]string{ + "ttlInSeconds": strconv.Itoa(ttlInSeconds), + } + + errSave := cl.SaveState(ctx, stateStoreName, key, []byte(value), mapOptionsExpiringKey) + require.NoError(currentT, errSave) + + item, errGetBeforeTTLExpiration := cl.GetState(ctx, stateStoreName, key, nil) + require.NoError(currentT, errGetBeforeTTLExpiration) + assert.Equal(currentT, value, string(item.Value)) + + time.Sleep(6 * ttlExpirationTime) + + assertNilValue(ctx, currentT, cl, key) + + return nil + } + + testBulkOperations = func(ctx flow.Context) error { + cl, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) + if err != nil { + panic(err) + } + defer cl.Close() + + // add multiple entries + setReq := make([]*client.SetStateItem, 0) + setReq = append(setReq, &client.SetStateItem{Key: "1", Value: []byte("value1")}) + setReq = append(setReq, &client.SetStateItem{Key: "2", Value: []byte("value2")}) + setReq = append(setReq, &client.SetStateItem{Key: "3", Value: []byte("value3")}) + + errSaveBulk := cl.SaveBulkState(ctx, stateStoreName, setReq...) + require.NoError(currentT, errSaveBulk) + + // get back multiple entries + bulkStateItems, errGetBulk := cl.GetBulkState(ctx, stateStoreName, []string{"1", "2", "3"}, map[string]string{}, 0) + require.NoError(currentT, errGetBulk) + assert.Equal(currentT, 3, len(bulkStateItems)) + + for _, v := range bulkStateItems { + k := v.Key + assert.Equal(currentT, fmt.Sprintf("value%s", k), string(v.Value)) + } + + // test bulkGet non-existent key + bulkStateItemsNoKey, errGetBulkNoKey := cl.GetBulkState(ctx, stateStoreName, []string{"1234"}, map[string]string{}, 0) + require.NoError(currentT, errGetBulkNoKey) + assert.Equal(currentT, 1, len(bulkStateItemsNoKey)) + assert.Nil(currentT, bulkStateItemsNoKey[0].Value) + + // test bulkGet with partial non-existent keys + bulkStateItems2, errGetBulk2 := cl.GetBulkState(ctx, stateStoreName, []string{"1", "2", "5000"}, map[string]string{}, 0) + require.NoError(currentT, errGetBulk2) + assert.Equal(currentT, 3, len(bulkStateItems2)) + + for _, v := range bulkStateItems2 { + k := v.Key + if k == "5000" { + assert.Nil(currentT, v.Value) + } else { + assert.Equal(currentT, fmt.Sprintf("value%s", k), string(v.Value)) + } + } + + // test bulk delete + bulkDeleteErr := cl.DeleteBulkState(ctx, stateStoreName, []string{"1", "2", "3"}, map[string]string{}) + require.NoError(currentT, bulkDeleteErr) + + assertNilValue(ctx, currentT, cl, "1") + assertNilValue(ctx, currentT, cl, "2") + assertNilValue(ctx, currentT, cl, "3") + + return nil + } + + checkCoherenceReady = func(ctx flow.Context) error { + maxTries := 30 + + for i := 0; i < maxTries; i++ { + time.Sleep(1 * time.Second) + + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + } + client := &http.Client{Transport: tr, + Timeout: time.Duration(30) * time.Second, + CheckRedirect: func(_ *http.Request, _ []*http.Request) error { + return http.ErrUseLastResponse + }} + + req, err := http.NewRequest("GET", "http://127.0.0.1:6676/ready", bytes.NewBuffer([]byte{})) + if err != nil { + continue + } + + resp, err := client.Do(req) + if err != nil { + continue + } + + defer resp.Body.Close() + + if resp.StatusCode == 200 { + return nil + } + continue + } + + return fmt.Errorf("timeout after %d tries", maxTries) + } +) + +func TestCoherenceStandard(t *testing.T) { + RunTestCoherence(t, "components/docker/default") +} + +func TestCoherenceScope(t *testing.T) { + RunTestCoherence(t, "components/docker/scope") +} + +func TestCoherenceNearCacheTTL(t *testing.T) { + RunTestCoherence(t, "components/docker/nearcachettl") +} + +func TestCoherenceNearCacheMemory(t *testing.T) { + RunTestCoherence(t, "components/docker/nearcachememory") +} + +func TestCoherenceNearCacheUnits(t *testing.T) { + RunTestCoherence(t, "components/docker/nearcacheunits") +} + +func TestCoherenceNearCacheTTLAndUnits(t *testing.T) { + RunTestCoherence(t, "components/docker/nearcachettlandunits") +} + +func TestCoherenceNearCacheTTLAndMemory(t *testing.T) { + RunTestCoherence(t, "components/docker/nearcachettlandmemory") +} + +func RunTestCoherence(t *testing.T, resourcesPath string) { + _ = os.Setenv("COHERENCE_LOG_LEVEL", "DEBUG") + currentT = t + log := logger.NewLogger("dapr.components") + stateStore := state_coherence.NewCoherenceStateStore(log) + + require.NoError(t, setCurrentFreePorts()) + + flow.New(t, "Connecting Coherence And Test basic operations"). + Step(dockercompose.Run("coherence", dockerComposeClusterYAML)). + Step("Waiting for component to start...", flow.Sleep(10*time.Second)). + Step(sidecar.Run(daprSideCar, + append(componentRuntimeOptions(stateStore, log, "coherence"), + embedded.WithoutApp(), + embedded.WithGracefulShutdownDuration(gracefulShutdownTimeout), + embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)), + embedded.WithResourcesPath(resourcesPath), + )..., + )). + Step("Waiting for Coherence to be ready...", checkCoherenceReady). + Step("Run basic CRUD tests", basicTestCRUDTests). + Step("Run TTL tests", testTTL). + Step("Run Bulk Operations tests", testBulkOperations). + Step("Stop Coherence server", dockercompose.Stop("coherence", dockerComposeClusterYAML)). + Step("stop dapr", sidecar.Stop(sidecarNamePrefix+"dockerDefault")). + Run() +} + +func assertNilValue(ctx context.Context, t *testing.T, cl client.Client, key string) { + item, err := cl.GetState(ctx, stateStoreName, key, nil) + require.NoError(t, err) + assert.NotNil(t, item.Key) + assert.Nil(t, item.Value) +} + +func componentRuntimeOptions(stateStore state.Store, log logger.Logger, stateStoreName string) []embedded.Option { + stateRegistry := state_loader.NewRegistry() + stateRegistry.Logger = log + componentFactory := func(l logger.Logger) state.Store { return stateStore } + + stateRegistry.RegisterComponent(componentFactory, stateStoreName) + + return []embedded.Option{ + embedded.WithStates(stateRegistry), + } +} + +func setCurrentFreePorts() error { + ports, err := dapr_testing.GetFreePorts(2) + if err != nil { + return err + } + currentGrpcPort = ports[0] + currentHTTPPort = ports[1] + + return nil +} diff --git a/tests/certification/state/coherence/components/docker/default/coherencestatestore.yaml b/tests/certification/state/coherence/components/docker/default/coherencestatestore.yaml new file mode 100644 index 0000000000..9e63813ac3 --- /dev/null +++ b/tests/certification/state/coherence/components/docker/default/coherencestatestore.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: localhost:1408 + - name: requestTimeout + value: "60s" \ No newline at end of file diff --git a/tests/certification/state/coherence/components/docker/nearcachememory/coherencestatestore.yaml b/tests/certification/state/coherence/components/docker/nearcachememory/coherencestatestore.yaml new file mode 100644 index 0000000000..b0dc08c08e --- /dev/null +++ b/tests/certification/state/coherence/components/docker/nearcachememory/coherencestatestore.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: localhost:1408 + - name: requestTimeout + value: "60s" + - name: nearCacheMemory + value: "4096" \ No newline at end of file diff --git a/tests/certification/state/coherence/components/docker/nearcachettl/coherencestatestore.yaml b/tests/certification/state/coherence/components/docker/nearcachettl/coherencestatestore.yaml new file mode 100644 index 0000000000..7ea0bade6c --- /dev/null +++ b/tests/certification/state/coherence/components/docker/nearcachettl/coherencestatestore.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: localhost:1408 + - name: requestTimeout + value: "60s" + - name: nearCacheTTL + value: "5s" \ No newline at end of file diff --git a/tests/certification/state/coherence/components/docker/nearcachettlandmemory/coherencestatestore.yaml b/tests/certification/state/coherence/components/docker/nearcachettlandmemory/coherencestatestore.yaml new file mode 100644 index 0000000000..093e3382d7 --- /dev/null +++ b/tests/certification/state/coherence/components/docker/nearcachettlandmemory/coherencestatestore.yaml @@ -0,0 +1,16 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: localhost:1408 + - name: requestTimeout + value: "60s" + - name: nearCacheMemory + value: "4096" + - name: nearCacheTTL + value: "10s" \ No newline at end of file diff --git a/tests/certification/state/coherence/components/docker/nearcachettlandunits/coherencestatestore.yaml b/tests/certification/state/coherence/components/docker/nearcachettlandunits/coherencestatestore.yaml new file mode 100644 index 0000000000..93d8af8784 --- /dev/null +++ b/tests/certification/state/coherence/components/docker/nearcachettlandunits/coherencestatestore.yaml @@ -0,0 +1,16 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: localhost:1408 + - name: requestTimeout + value: "60s" + - name: nearCacheUnits + value: "2" + - name: nearCacheTTL + value: "10s" \ No newline at end of file diff --git a/tests/certification/state/coherence/components/docker/nearcacheunits/coherencestatestore.yaml b/tests/certification/state/coherence/components/docker/nearcacheunits/coherencestatestore.yaml new file mode 100644 index 0000000000..6eeb7fb95c --- /dev/null +++ b/tests/certification/state/coherence/components/docker/nearcacheunits/coherencestatestore.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: localhost:1408 + - name: requestTimeout + value: "60s" + - name: nearCacheUnits + value: "2" \ No newline at end of file diff --git a/tests/certification/state/coherence/components/docker/scope/coherencestatestore.yaml b/tests/certification/state/coherence/components/docker/scope/coherencestatestore.yaml new file mode 100644 index 0000000000..9ae282b005 --- /dev/null +++ b/tests/certification/state/coherence/components/docker/scope/coherencestatestore.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: "localhost:1408" + - name: scopeName + value: "new-scope" + - name: requestTimeout + value: "60s" \ No newline at end of file diff --git a/tests/certification/state/coherence/config.yaml b/tests/certification/state/coherence/config.yaml new file mode 100644 index 0000000000..6c95e632ff --- /dev/null +++ b/tests/certification/state/coherence/config.yaml @@ -0,0 +1,6 @@ +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: keyvaultconfig +spec: + features: diff --git a/tests/certification/state/coherence/docker-compose.yaml b/tests/certification/state/coherence/docker-compose.yaml new file mode 100644 index 0000000000..de9641cae2 --- /dev/null +++ b/tests/certification/state/coherence/docker-compose.yaml @@ -0,0 +1,13 @@ +services: + coherence: + image: 'ghcr.io/oracle/coherence-ce:14.1.2-0-2-java17' + environment: + - coherence.management.http=all + - coherence.management.http.port=30000 + - Dcoherence.health.http.port=6676 + - coherence.wka=127.0.0.1 + ports: + - 30000:30000 + - 1408:1408 + - 9612:9612 + - 6676:6676 diff --git a/tests/config/state/coherence/statestore.yaml b/tests/config/state/coherence/statestore.yaml new file mode 100644 index 0000000000..aa9010373f --- /dev/null +++ b/tests/config/state/coherence/statestore.yaml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.coherence + version: v1 + metadata: + - name: serverAddress + value: localhost:1408 \ No newline at end of file diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 5e5328e098..ed0f60de09 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -29,6 +29,8 @@ components: config: # This component requires etags to be hex-encoded numbers badEtag: "FFFF" + - component: coherence + operations: [ "ttl" ] - component: sqlserver operations: [ "transaction", "etag", "first-write", "ttl" ] config: diff --git a/tests/conformance/state_test.go b/tests/conformance/state_test.go index 1a8441742c..e102a68b27 100644 --- a/tests/conformance/state_test.go +++ b/tests/conformance/state_test.go @@ -31,6 +31,7 @@ import ( s_cassandra "github.com/dapr/components-contrib/state/cassandra" s_cloudflareworkerskv "github.com/dapr/components-contrib/state/cloudflare/workerskv" s_cockroachdb_v1 "github.com/dapr/components-contrib/state/cockroachdb" + s_coherence "github.com/dapr/components-contrib/state/coherence" s_etcd "github.com/dapr/components-contrib/state/etcd" s_gcpfirestore "github.com/dapr/components-contrib/state/gcp/firestore" s_inmemory "github.com/dapr/components-contrib/state/in-memory" @@ -140,6 +141,8 @@ func loadStateStore(name string) state.Store { return s_gcpfirestore.NewFirestoreStateStore(testLogger) case "gcp.firestore.cloud": return s_gcpfirestore.NewFirestoreStateStore(testLogger) + case "coherence": + return s_coherence.NewCoherenceStateStore(testLogger) default: return nil } From 32ce43c98095064324093360ff5a08bad9a10b61 Mon Sep 17 00:00:00 2001 From: Tim Middleton Date: Tue, 17 Jun 2025 10:40:35 +0800 Subject: [PATCH 2/3] Address review comments Signed-off-by: Tim Middleton --- state/coherence/coherence.go | 43 ++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/state/coherence/coherence.go b/state/coherence/coherence.go index 4b7641bd93..7d98f0f0c4 100644 --- a/state/coherence/coherence.go +++ b/state/coherence/coherence.go @@ -46,9 +46,15 @@ const ( nearCacheMemoryConfig = "nearCacheMemory" scopeNameConfig = "scopeName" defaultScopeNameConfig = "default" + defaultServerAddress = "localhost:1408" + defaultTLSEnabled = false ) -var errTrueOrFalse = errors.New("value should be true or false") +var ( + errTrueOrFalse = errors.New("value should be true or false") + defaultRequestTimeout = time.Duration(30) * time.Second + defaultNearCacheTTL = time.Duration(0) +) type Coherence struct { state.BulkStore @@ -92,24 +98,23 @@ func (c *Coherence) Init(_ context.Context, metadata state.Metadata) error { // configure TLS and options if !meta.TLSEnabled { options = append(options, coh.WithPlainText()) - } - - if meta.TLSClientCertPath != "" { - options = append(options, coh.WithTLSClientCert(meta.TLSClientCertPath)) - } - if meta.TLSCertsPath != "" { - options = append(options, coh.WithTLSCertsPath(meta.TLSCertsPath)) - } - if meta.TLSClientKey != "" { - options = append(options, coh.WithTLSClientKey(meta.TLSClientKey)) - } - if meta.IgnoreInvalidCerts { - options = append(options, coh.WithIgnoreInvalidCerts()) + } else { + if meta.TLSClientCertPath != "" { + options = append(options, coh.WithTLSClientCert(meta.TLSClientCertPath)) + } + if meta.TLSCertsPath != "" { + options = append(options, coh.WithTLSCertsPath(meta.TLSCertsPath)) + } + if meta.TLSClientKey != "" { + options = append(options, coh.WithTLSClientKey(meta.TLSClientKey)) + } + if meta.IgnoreInvalidCerts { + options = append(options, coh.WithIgnoreInvalidCerts()) + } } options = append(options, coh.WithRequestTimeout(meta.RequestTimeout)) - // create the Coherence session session, err := coh.NewSession(context.Background(), options...) if err != nil { return err @@ -139,10 +144,10 @@ func (c *Coherence) Init(_ context.Context, metadata state.Metadata) error { func retrieveCoherenceMetadata(meta state.Metadata) (*coherenceMetadata, error) { c := coherenceMetadata{ - ServerAddress: "localhost:1408", - TLSEnabled: false, - RequestTimeout: time.Duration(30) * time.Second, - NearCacheTTL: 0, + ServerAddress: defaultServerAddress, + TLSEnabled: defaultTLSEnabled, + RequestTimeout: defaultRequestTimeout, + NearCacheTTL: defaultNearCacheTTL, ScopeName: defaultScopeNameConfig, } From 664bb0d1e385ef59506eed94f9b1aa4e03a0e290 Mon Sep 17 00:00:00 2001 From: Tim Middleton Date: Thu, 26 Jun 2025 08:52:20 +0800 Subject: [PATCH 3/3] Update tests/certification/state/coherence/README.md Co-authored-by: Cassie Coyle Signed-off-by: Tim Middleton --- tests/certification/state/coherence/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/certification/state/coherence/README.md b/tests/certification/state/coherence/README.md index b514273cc5..89f62bf6ca 100644 --- a/tests/certification/state/coherence/README.md +++ b/tests/certification/state/coherence/README.md @@ -27,7 +27,7 @@ This state store [supports the following features][features]: ## Out of scope -1Tests for [features not implemented by Coherence][features] are out of scope. This includes +1. Tests for [features not implemented by Coherence][features] are out of scope. This includes * Transactional * ETag * Actors