Skip to content

Commit

Permalink
detach redis from skeleton plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Lavor <[email protected]>
  • Loading branch information
VladoLavor committed Apr 18, 2018
1 parent 48f8eca commit 90b9b59
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 132 deletions.
6 changes: 3 additions & 3 deletions db/keyval/etcdv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// Config represents a part of the etcd configuration that can be
// loaded from a file. Usually, the Config is next transformed into
// ClientConfig using ConfigToClientv3() function for use with the coreos/etcd
// ClientConfig using ConfigToClient() function for use with the coreos/etcd
// package.
type Config struct {
Endpoints []string `json:"endpoints"`
Expand Down Expand Up @@ -60,7 +60,7 @@ const (
defaultOpTimeout = 3 * time.Second
)

// ConfigToClientv3 transforms yaml configuration <yc> modelled by Config
// ConfigToClient transforms yaml configuration <yc> modelled by Config
// into ClientConfig, which is ready for use with the underlying coreos/etcd
// package.
// If the etcd endpoint addresses are not specified in the configuration,
Expand All @@ -69,7 +69,7 @@ const (
// endpoint location, a default address "127.0.0.1:2379" is assumed.
// The function may return error only if TLS connection is selected and the
// CA or client certificate is not accessible/valid.
func ConfigToClientv3(yc *Config) (*ClientConfig, error) {
func ConfigToClient(yc *Config) (*ClientConfig, error) {
dialTimeout := defaultDialTimeout
if yc.DialTimeout != 0 {
dialTimeout = yc.DialTimeout
Expand Down
2 changes: 1 addition & 1 deletion db/keyval/etcdv3/etcdv3_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestWatchDeleteResp(t *testing.T) {
func TestConfig(t *testing.T) {
RegisterTestingT(t)
cfg := &Config{DialTimeout: time.Second, OpTimeout: time.Second}
etcdCfg, err := ConfigToClientv3(cfg)
etcdCfg, err := ConfigToClient(cfg)
Expect(err).To(BeNil())
Expect(etcdCfg).NotTo(BeNil())
Expect(etcdCfg.OpTimeout).To(BeEquivalentTo(time.Second))
Expand Down
4 changes: 2 additions & 2 deletions db/keyval/etcdv3/plugin_impl_etcdv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (plugin *Plugin) Init() (err error) {
if err != nil || plugin.disabled {
return err
}
// Transforms .yaml config to ETCD client configuration
etcdClientCfg, err := ConfigToClientv3(&etcdCfg)
// Transforms .yaml config to ETCD client configuration
etcdClientCfg, err := ConfigToClient(&etcdCfg)
if err != nil {
return err
}
Expand Down
54 changes: 0 additions & 54 deletions db/keyval/plugin/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,3 @@
// limitations under the License.

package plugin

import (
"github.com/ligato/cn-infra/db/keyval"
"github.com/ligato/cn-infra/db/keyval/kvproto"
"github.com/ligato/cn-infra/servicelabel"
"github.com/ligato/cn-infra/utils/safeclose"
)

// Connection defines an access to a particular key-value data store implementation.
type Connection interface {
keyval.CoreBrokerWatcher
}

// Skeleton of a KV plugin is a generic part of KV plugin.
type Skeleton struct {
serviceLabel servicelabel.ReaderAPI
name string
protoWrapper *kvproto.ProtoWrapper
connection Connection
}

// NewSkeleton creates a new instance of the Skeleton with the given connector.
// The connection is established in AfterInit phase.
func NewSkeleton(name string, serviceLabel servicelabel.ReaderAPI,
connection Connection) *Skeleton {
return &Skeleton{serviceLabel: serviceLabel, name: name, connection: connection}
}

// Init is called on plugin startup.
func (plugin *Skeleton) Init() (err error) {
plugin.protoWrapper = kvproto.NewProtoWrapperWithSerializer(plugin.connection, &keyval.SerializerJSON{})
return err
}

// AfterInit is called once all plugins have been initialized. The connection to datastore
// is established in this phase.
func (plugin *Skeleton) AfterInit() (err error) {
return nil
}

// Close cleans up the resources.
func (plugin *Skeleton) Close() error {
return safeclose.Close(plugin.connection)
}

// NewBroker creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (plugin *Skeleton) NewBroker(keyPrefix string) keyval.ProtoBroker {
return plugin.protoWrapper.NewBroker(keyPrefix)
}

// NewWatcher creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (plugin *Skeleton) NewWatcher(keyPrefix string) keyval.ProtoWatcher {
return plugin.protoWrapper.NewWatcher(keyPrefix)
}
12 changes: 6 additions & 6 deletions db/keyval/redis/bytes_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ func TestConfig(t *testing.T) {
configs := []interface{}{nodeConfig, sentinelConfig, clusterConfig}
yamlFile := "./redis_client-unit_test.yaml"
for _, c := range configs {
client, err := CreateClient(c)
client, err := ConfigToClient(c)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(client).ShouldNot(gomega.BeNil())

config.SaveConfigToYamlFile(c, yamlFile, 0644, makeTypeHeader(c))
c, err = LoadConfig(yamlFile)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(c).ShouldNot(gomega.BeNil())
client, err = CreateClient(c)
client, err = ConfigToClient(c)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(client).ShouldNot(gomega.BeNil())
}
Expand All @@ -206,22 +206,22 @@ func TestBadConfig(t *testing.T) {
}

var cfg *NodeConfig
client, err := CreateClient(cfg)
client, err := ConfigToClient(cfg)
gomega.Expect(err).Should(gomega.HaveOccurred())
gomega.Expect(client).Should(gomega.BeNil())
client, err = CreateClient(nil)
client, err = ConfigToClient(nil)
gomega.Expect(err).Should(gomega.HaveOccurred())
gomega.Expect(client).Should(gomega.BeNil())

nodeConfig.TLS.Enabled = true
nodeConfig.TLS.CAfile = "bad CA file"
client, err = CreateClient(nodeConfig)
client, err = ConfigToClient(nodeConfig)
gomega.Expect(err).Should(gomega.HaveOccurred())
gomega.Expect(client).Should(gomega.BeNil())

nodeConfig.TLS.Certfile = "bad cert file"
nodeConfig.TLS.Keyfile = "bad key file"
client, err = CreateClient(nodeConfig)
client, err = ConfigToClient(nodeConfig)
gomega.Expect(err).Should(gomega.HaveOccurred())
gomega.Expect(client).Should(gomega.BeNil())
}
Expand Down
4 changes: 2 additions & 2 deletions db/keyval/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ type PoolConfig struct {
IdleCheckFrequency time.Duration `json:"idle-check-frequency"`
}

// CreateClient creates an appropriate client according to the configuration
// ConfigToClient creates an appropriate client according to the configuration
// parameter.
func CreateClient(config interface{}) (Client, error) {
func ConfigToClient(config interface{}) (Client, error) {
switch cfg := config.(type) {
case NodeConfig:
return CreateNodeClient(cfg)
Expand Down
101 changes: 42 additions & 59 deletions db/keyval/redis/plugin_impl_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,23 @@
package redis

import (
"github.com/ligato/cn-infra/core"
"github.com/ligato/cn-infra/db/keyval/plugin"
"github.com/ligato/cn-infra/db/keyval"
"github.com/ligato/cn-infra/db/keyval/kvproto"
"github.com/ligato/cn-infra/flavors/local"
"github.com/ligato/cn-infra/health/statuscheck"
"github.com/ligato/cn-infra/utils/safeclose"
)

const (
// healthCheckProbeKey is a key used to probe Redis state.
healthCheckProbeKey string = "probe-redis-connection"
healthCheckProbeKey = "probe-redis-connection"
)

// Plugin implements redis plugin.
type Plugin struct {
Deps
*plugin.Skeleton
disabled bool
connection *BytesConnectionRedis
disabled bool
connection *BytesConnectionRedis
protoWrapper *kvproto.ProtoWrapper
}

// Deps lists dependencies of the redis plugin.
Expand All @@ -45,89 +44,73 @@ type Deps struct {
// If the configuration file doesn't exist or cannot be read, the returned error
// will be of os.PathError type. An untyped error is returned in case the file
// doesn't contain a valid YAML configuration.
func (p *Plugin) Init() error {
cfg, err := p.retrieveConfig()
if err != nil {
return err
}
if p.disabled {
return nil
}

client, err := CreateClient(cfg)
if err != nil {
func (plugin *Plugin) Init() (err error) {
redisCfg, err := plugin.getRedisConfig()
if err != nil || plugin.disabled {
return err
}

p.connection, err = NewBytesConnection(client, p.Log)
// Create client according to config
client, err := ConfigToClient(redisCfg)
if err != nil {
return err
}

p.Skeleton = plugin.NewSkeleton(string(p.PluginName), p.ServiceLabel, p.connection)
err = p.Skeleton.Init()
// Uses config file to establish connection with the database
plugin.connection, err = NewBytesConnection(client, plugin.Log)
if err != nil {
return err
}
plugin.protoWrapper = kvproto.NewProtoWrapperWithSerializer(plugin.connection, &keyval.SerializerJSON{})

// Register for providing status reports (polling mode)
if p.StatusCheck != nil {
p.StatusCheck.Register(core.PluginName(p.String()), func() (statuscheck.PluginState, error) {
_, _, err := p.Skeleton.NewBroker("/").GetValue(healthCheckProbeKey, nil)
if plugin.StatusCheck != nil {
plugin.StatusCheck.Register(plugin.PluginName, func() (statuscheck.PluginState, error) {
_, _, err := plugin.NewBroker("/").GetValue(healthCheckProbeKey, nil)
if err == nil {
return statuscheck.OK, nil
}
return statuscheck.Error, err
})
} else {
p.Log.Warnf("Unable to start status check for redis")
plugin.Log.Warnf("Unable to start status check for redis")
}

return nil
}

// AfterInit is called by the Agent Core after all plugins have been initialized.
func (p *Plugin) AfterInit() error {
if p.disabled {
return nil
}

// Close does nothing for redis plugin.
func (plugin *Plugin) Close() error {
return nil
}

// Close shutdowns the connection to redis.
func (p *Plugin) Close() error {
_, err := safeclose.CloseAll(p.Skeleton)
return err
// NewBroker creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (plugin *Plugin) NewBroker(keyPrefix string) keyval.ProtoBroker {
return plugin.protoWrapper.NewBroker(keyPrefix)
}

// NewWatcher creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (plugin *Plugin) NewWatcher(keyPrefix string) keyval.ProtoWatcher {
return plugin.protoWrapper.NewWatcher(keyPrefix)
}

func (p *Plugin) retrieveConfig() (cfg interface{}, err error) {
found, _ := p.PluginConfig.GetValue(&struct{}{})
// Disabled returns *true* if the plugin is not in use due to missing
// redis configuration.
func (plugin *Plugin) Disabled() (disabled bool) {
return plugin.disabled
}

func (plugin *Plugin) getRedisConfig() (cfg interface{}, err error) {
found, _ := plugin.PluginConfig.GetValue(&struct{}{})
if !found {
p.Log.Info("redis config not found ", p.PluginConfig.GetConfigName(), " - skip loading this plugin")
p.disabled = true
return nil, nil
plugin.Log.Info("Redis config not found, skip loading this plugin")
plugin.disabled = true
return
}
configFile := p.PluginConfig.GetConfigName()
configFile := plugin.PluginConfig.GetConfigName()
if configFile != "" {
cfg, err = LoadConfig(configFile)
if err != nil {
return nil, err
return
}
}
return cfg, nil
}

// String returns Deps.PluginName if set, "redis-client" otherwise.
func (p *Plugin) String() string {
if len(p.Deps.PluginName) == 0 {
return "redis-client"
}
return string(p.Deps.PluginName)
}

// Disabled returns *true* if the plugin is not in use due to missing
// redis configuration.
func (p *Plugin) Disabled() (disabled bool) {
return p.disabled
return
}
2 changes: 1 addition & 1 deletion examples/etcdv3-lib/editor/editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func processArgs() (cfg *etcdv3.ClientConfig, op int, data []string, err error)
if err != nil {
return
}
cfg, err = etcdv3.ConfigToClientv3(fileConfig)
cfg, err = etcdv3.ConfigToClient(fileConfig)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion examples/etcdv3-lib/view/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func processArgs() (*etcdv3.ClientConfig, error) {
}
}

return etcdv3.ConfigToClientv3(fileConfig)
return etcdv3.ConfigToClient(fileConfig)
}

func printUsage() {
Expand Down
2 changes: 1 addition & 1 deletion examples/etcdv3-lib/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func processArgs() (*etcdv3.ClientConfig, error) {
}
}

return etcdv3.ConfigToClientv3(fileConfig)
return etcdv3.ConfigToClient(fileConfig)
}

func printUsage() {
Expand Down
2 changes: 1 addition & 1 deletion examples/redis-lib/airport/airport.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func loadConfig() interface{} {
}

func createConnection(cfg interface{}) *redis.BytesConnectionRedis {
client, err := redis.CreateClient(cfg)
client, err := redis.ConfigToClient(cfg)
if err != nil {
log.Panicf("CreateNodeClient() failed: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/redis-lib/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func loadConfig() interface{} {
}

func createConnection(cfg interface{}) *redis.BytesConnectionRedis {
client, err := redis.CreateClient(cfg)
client, err := redis.ConfigToClient(cfg)
if err != nil {
log.Panicf("CreateNodeClient() failed: %s", err)
}
Expand Down

0 comments on commit 90b9b59

Please sign in to comment.