Skip to content

Commit

Permalink
feat: support polaris service governance
Browse files Browse the repository at this point in the history
  • Loading branch information
shenqidebaozi committed Dec 20, 2022
1 parent facafba commit b67d514
Show file tree
Hide file tree
Showing 11 changed files with 1,837 additions and 12 deletions.
20 changes: 14 additions & 6 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,16 @@ func (a *App) Run() error {
})
}
wg.Wait()
if a.opts.registrar != nil {
if a.opts.registrars != nil {
rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
defer rcancel()
if err = a.opts.registrar.Register(rctx, instance); err != nil {
return err

for _, registrar := range a.opts.registrars {
if err = registrar.Register(rctx, instance); err != nil {
return err
}
}

}
for _, fn := range a.opts.afterStart {
if err = fn(sctx); err != nil {
Expand Down Expand Up @@ -155,12 +159,16 @@ func (a *App) Stop() (err error) {
a.mu.Lock()
instance := a.instance
a.mu.Unlock()
if a.opts.registrar != nil && instance != nil {
if a.opts.registrars != nil && instance != nil {
ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.registrarTimeout)
defer cancel()
if err = a.opts.registrar.Deregister(ctx, instance); err != nil {
return err

for _, registrar := range a.opts.registrars {
if err = registrar.Deregister(ctx, instance); err != nil {
return err
}
}

}
if a.cancel != nil {
a.cancel()
Expand Down
172 changes: 172 additions & 0 deletions contrib/polaris/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package polaris

import (
"errors"
"fmt"
"path/filepath"
"strings"

"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"

"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/log"
)

// ConfigOption is polaris config option.
type ConfigOption func(o *configOptions)

type configOptions struct {
namespace string
fileGroup string
fileName string
configFile polaris.ConfigFile
}

// WithConfigNamespace with polaris config namespace
func WithConfigNamespace(namespace string) ConfigOption {
return func(o *configOptions) {
o.namespace = namespace
}
}

// WithFileGroup with polaris config fileGroup
func WithFileGroup(fileGroup string) ConfigOption {
return func(o *configOptions) {
o.fileGroup = fileGroup
}
}

// WithFileName with polaris config fileName
func WithFileName(fileName string) ConfigOption {
return func(o *configOptions) {
o.fileName = fileName
}
}

type source struct {
client polaris.ConfigAPI
options *configOptions
}

func (p *Polaris) Config(opts ...ConfigOption) (config.Source, error) {
options := &configOptions{
namespace: "default",
fileGroup: "",
fileName: "",
}

for _, opt := range opts {
opt(options)
}

if options.fileGroup == "" {
return nil, errors.New("fileGroup invalid")
}

if options.fileName == "" {
return nil, errors.New("fileName invalid")
}

return &source{
client: p.config,
options: options,
}, nil
}

// Load return the config values
func (s *source) Load() ([]*config.KeyValue, error) {
configFile, err := s.client.GetConfigFile(s.options.namespace, s.options.fileGroup, s.options.fileName)
if err != nil {
fmt.Println("fail to get config.", err)
return nil, err
}

if err != nil {
return nil, err
}
content := configFile.GetContent()
k := s.options.fileName

s.options.configFile = configFile

return []*config.KeyValue{
{
Key: k,
Value: []byte(content),
Format: strings.TrimPrefix(filepath.Ext(k), "."),
},
}, nil
}

// Watch return the watcher
func (s *source) Watch() (config.Watcher, error) {
return newConfigWatcher(s.options.configFile), nil
}

type ConfigWatcher struct {
configFile polaris.ConfigFile
fullPath string
}

type eventChan struct {
closed bool
event chan model.ConfigFileChangeEvent
}

var eventChanMap = make(map[string]eventChan)

func getFullPath(namespace string, fileGroup string, fileName string) string {
return fmt.Sprintf("%s/%s/%s", namespace, fileGroup, fileName)
}

func receive(event model.ConfigFileChangeEvent) {
meta := event.ConfigFileMetadata
ec := eventChanMap[getFullPath(meta.GetNamespace(), meta.GetFileGroup(), meta.GetFileName())]
defer func() {
if err := recover(); err != nil {
log.Error(err)
}
}()
if !ec.closed {
ec.event <- event
}
}

func newConfigWatcher(configFile polaris.ConfigFile) *ConfigWatcher {
configFile.AddChangeListener(receive)

fullPath := getFullPath(configFile.GetNamespace(), configFile.GetFileGroup(), configFile.GetFileName())
if _, ok := eventChanMap[fullPath]; !ok {
eventChanMap[fullPath] = eventChan{
closed: false,
event: make(chan model.ConfigFileChangeEvent),
}
}
w := &ConfigWatcher{
configFile: configFile,
fullPath: fullPath,
}
return w
}

func (w *ConfigWatcher) Next() ([]*config.KeyValue, error) {
ec := eventChanMap[w.fullPath]
event := <-ec.event
return []*config.KeyValue{
{
Key: w.configFile.GetFileName(),
Value: []byte(event.NewValue),
Format: strings.TrimPrefix(filepath.Ext(w.configFile.GetFileName()), "."),
},
}, nil
}

func (w *ConfigWatcher) Stop() error {
ec := eventChanMap[w.fullPath]
if !ec.closed {
ec.closed = true
close(ec.event)
}
return nil
}
Loading

0 comments on commit b67d514

Please sign in to comment.