diff --git a/apollo/client.go b/apollo/client.go index 7069f53..cbbf996 100644 --- a/apollo/client.go +++ b/apollo/client.go @@ -2,34 +2,126 @@ package apollo import ( "context" - "crypto/tls" - - "github.com/libgox/addr" - "golang.org/x/exp/slog" + "encoding/json" + "fmt" + "net/http" ) type Client interface { -} + GetStringValue(namespace, key string) string -type innerClient struct { - ctx context.Context + SubscribeEvent(listener Listener) } -type Config struct { - Address addr.Address - // TlsConfig configuration information for tls. - TlsConfig *tls.Config - // Logger structured logger for logging operations - Logger *slog.Logger +type innerClient struct { + ctx context.Context + config *Config + storage *storage + poller *longPoll + listener Listener } func NewClient(config *Config) (Client, error) { if config.Logger != nil { - config.Logger = slog.Default() + SetLogger(config.Logger) } ctx := context.Background() c := &innerClient{ - ctx: ctx, + ctx: ctx, + config: config, + storage: newStorage(config.NamespaceNames), } + c.poller = newLongPoll(config, c.updateHandle) + + // sync + err := c.poller.fetch(c.ctx) + if err != nil { + return nil, err + } + + // long poll + go c.poller.start(c.ctx) + return c, nil } + +func (i *innerClient) updateHandle(notification *notification) error { + change, err := i.sync(notification) + if err != nil { + return err + } + if change == nil || len(change.Changes) == 0 { + return fmt.Errorf("no changes to sync") + } + if i.listener != nil { + i.listener.OnChange(change) + } + return nil +} + +func (i *innerClient) sync(notification *notification) (*ChangeEvent, error) { + log.Infof("sync namespace %s with remote config server", notification.NamespaceName) + url := i.config.GetSyncURI(notification.NamespaceName) + r := &requester{ + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: i.config.TLSConfig, + }, + }, + retries: 3, + } + result, err := r.do(i.ctx, url, r.retries) + if err != nil || len(result) == 0 { + return nil, err + } + + ac := &apolloConfiguration{} + if err = json.Unmarshal(result, ac); err != nil { + return nil, err + } + return i.updateCache(ac) +} + +func (i *innerClient) updateCache(ac *apolloConfiguration) (*ChangeEvent, error) { + var change = &ChangeEvent{ + Namespace: ac.NamespaceName, + Changes: make(map[string]*Change), + } + c := i.storage.loadCache(ac.NamespaceName) + + c.data.Range(func(k, v interface{}) bool { + key := k.(string) + value := v.(string) + if _, ok := ac.Configurations[key]; !ok { + c.data.Delete(key) + change.Changes[key] = onDelete(key, value) + } + return true + }) + + for k, v := range ac.Configurations { + old, ok := c.data.Load(k) + if !ok { + change.Changes[k] = onAdd(k, v) + c.data.Store(k, v) + continue + } + if old.(string) != v { + change.Changes[k] = onModify(k, old.(string), v) + } + c.data.Store(k, v) + } + return change, nil +} + +func (i *innerClient) SubscribeEvent(listener Listener) { + i.listener = listener +} + +func (i *innerClient) GetStringValue(namespace string, key string) string { + v, ok := i.storage.loadCache(namespace).data.Load(key) + if !ok { + return "" + } + return v.(string) +} diff --git a/apollo/client_test.go b/apollo/client_test.go new file mode 100644 index 0000000..c4b87fa --- /dev/null +++ b/apollo/client_test.go @@ -0,0 +1,35 @@ +package apollo + +import ( + "testing" + "time" + + "github.com/libgox/addr" +) + +func TestNewClient(t *testing.T) { + c, err := NewClient(&Config{ + AppID: "SampleApp", + Cluster: "default", + NamespaceNames: []string{"application", "application2"}, + Address: addr.Address{ + Host: "localhost", + Port: 8080, + }, + Secret: "", + TLSConfig: nil, + Logger: nil, + }) + if err == nil { + value := c.GetStringValue("application", "timeout") + value2 := c.GetStringValue("application2", "timeout") + c.SubscribeEvent(&ClientTest{}) + t.Log(value, ",", value2) + } + time.Sleep(100 * time.Second) +} + +type ClientTest struct{} + +func (c *ClientTest) OnChange(event *ChangeEvent) { +} diff --git a/apollo/config.go b/apollo/config.go new file mode 100644 index 0000000..69dcbb8 --- /dev/null +++ b/apollo/config.go @@ -0,0 +1,47 @@ +package apollo + +import ( + "crypto/tls" + "fmt" + "net/url" + + "github.com/libgox/addr" +) + +type Config struct { + AppID string + Cluster string + NamespaceNames []string + Address addr.Address + Secret string + // TlsConfig configuration information for tls. + TLSConfig *tls.Config + Logger Logger +} + +func (c *Config) GetNotifyURLSuffix(notifications string) string { + return fmt.Sprintf("%s/notifications/v2?appId=%s&cluster=%s¬ifications=%s", + c.GetUrlPrefix(), + url.QueryEscape(c.AppID), + url.QueryEscape(c.Cluster), + url.QueryEscape(notifications)) +} + +func (c *Config) GetSyncURI(namespace string) string { + return fmt.Sprintf("%s/configs/%s/%s/%s?releaseKey=&ip=%s", + c.GetUrlPrefix(), + url.QueryEscape(c.AppID), + url.QueryEscape(c.Cluster), + url.QueryEscape(namespace), + GetLocalIP()) +} + +func (c *Config) GetUrlPrefix() string { + var urlPrefix string + if c.TLSConfig != nil { + urlPrefix = "https://" + c.Address.Addr() + } else { + urlPrefix = "http://" + c.Address.Addr() + } + return urlPrefix +} diff --git a/apollo/event.go b/apollo/event.go new file mode 100644 index 0000000..f5121a0 --- /dev/null +++ b/apollo/event.go @@ -0,0 +1,53 @@ +package apollo + +type ChangeType int + +const ( + ADD ChangeType = iota + + MODIFY + + DELETE +) + +type Listener interface { + OnChange(event *ChangeEvent) +} + +type Change struct { + Key string + OldValue string + NewValue string + ChangeType ChangeType +} + +type ChangeEvent struct { + Namespace string + NotificationID int + Changes map[string]*Change +} + +func onDelete(key, value string) *Change { + return &Change{ + Key: key, + ChangeType: DELETE, + OldValue: value, + } +} + +func onModify(key, oldValue, newValue string) *Change { + return &Change{ + Key: key, + ChangeType: MODIFY, + OldValue: oldValue, + NewValue: newValue, + } +} + +func onAdd(key, value string) *Change { + return &Change{ + Key: key, + ChangeType: ADD, + NewValue: value, + } +} diff --git a/apollo/logger.go b/apollo/logger.go new file mode 100644 index 0000000..72f23c2 --- /dev/null +++ b/apollo/logger.go @@ -0,0 +1,57 @@ +package apollo + +import ( + "fmt" + + "golang.org/x/exp/slog" +) + +type Logger interface { + Info(format string, args ...interface{}) + + Error(format string, args ...interface{}) + + Warn(format string, args ...interface{}) + + Infof(format string, args ...interface{}) + + Errorf(format string, args ...interface{}) + + Warnf(format string, args ...interface{}) +} + +type defaultLogger struct { + Logger *slog.Logger +} + +var log Logger = &defaultLogger{ + Logger: slog.Default(), +} + +func SetLogger(logger Logger) { + log = logger +} + +func (d *defaultLogger) Info(format string, args ...interface{}) { + d.Logger.Info(format, args...) +} + +func (d *defaultLogger) Error(format string, args ...interface{}) { + d.Logger.Error(format, args...) +} + +func (d *defaultLogger) Warn(format string, args ...interface{}) { + d.Logger.Warn(format, args...) +} + +func (d *defaultLogger) Infof(format string, args ...interface{}) { + d.Logger.Info(fmt.Sprintf(format, args...)) +} + +func (d *defaultLogger) Errorf(format string, args ...interface{}) { + d.Logger.Error(fmt.Sprintf(format, args...)) +} + +func (d *defaultLogger) Warnf(format string, args ...interface{}) { + d.Logger.Warn(fmt.Sprintf(format, args...)) +} diff --git a/apollo/notification.go b/apollo/notification.go new file mode 100644 index 0000000..8c9683d --- /dev/null +++ b/apollo/notification.go @@ -0,0 +1,49 @@ +package apollo + +import ( + "encoding/json" + "sync" +) + +const defaultNotificationID int = -1 + +type notificationsMgr struct { + notifications sync.Map +} + +type notification struct { + NamespaceName string `json:"namespaceName"` + NotificationID int `json:"notificationId"` +} + +func newNotificationManager(namespaceNames []string) *notificationsMgr { + n := ¬ificationsMgr{ + notifications: sync.Map{}, + } + for _, namespaceName := range namespaceNames { + n.notifications.Store(namespaceName, defaultNotificationID) + } + return n +} + +func (n *notificationsMgr) String() string { + var notifications []*notification + n.notifications.Range(func(key, value interface{}) bool { + k, _ := key.(string) + v, _ := value.(int) + notifications = append(notifications, ¬ification{ + NamespaceName: k, + NotificationID: v, + }) + return true + }) + res, err := json.Marshal(¬ifications) + if err != nil { + return "" + } + return string(res) +} + +func (n *notificationsMgr) Store(namespaceName string, notificationID int) { + n.notifications.Store(namespaceName, notificationID) +} diff --git a/apollo/poll.go b/apollo/poll.go new file mode 100644 index 0000000..f91afb6 --- /dev/null +++ b/apollo/poll.go @@ -0,0 +1,110 @@ +package apollo + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +const defaultLongPollInterval = time.Second * 2 + +type notifyHandler func(n *notification) error + +type requester struct { + client *http.Client + retries int +} + +func (r *requester) do(ctx context.Context, uri string, retries int) ([]byte, error) { + resp, err := r.client.Get(uri) + if err != nil { + if retries > 0 { + return r.do(ctx, uri, retries-1) + } + return nil, err + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + log.Errorf("failed to close response body: %v", err) + return + } + }(resp.Body) + + if resp.StatusCode == http.StatusOK { + return io.ReadAll(resp.Body) + } + + return nil, fmt.Errorf("apollo return http resp code %d", resp.StatusCode) +} + +type longPoll struct { + config *Config + interval time.Duration + handler notifyHandler + requester *requester + notification *notificationsMgr +} + +func newLongPoll(config *Config, handler notifyHandler) *longPoll { + p := &longPoll{ + config: config, + interval: defaultLongPollInterval, + handler: handler, + requester: &requester{ + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: config.TLSConfig, + }, + }, + retries: 3, + }, + notification: newNotificationManager(config.NamespaceNames), + } + return p +} + +func (p *longPoll) start(ctx context.Context) { + child, cancel := context.WithCancel(ctx) + defer cancel() + + timer := time.NewTimer(p.interval) + defer timer.Stop() + + for { + select { + case <-timer.C: + err := p.fetch(child) + log.Errorf("fetch config err: %v", err) + timer.Reset(p.interval) + case <-child.Done(): + return + } + } +} + +func (p *longPoll) fetch(ctx context.Context) error { + url := p.config.GetNotifyURLSuffix(p.notification.String()) + result, err := p.requester.do(ctx, url, p.requester.retries) + if err != nil { + return err + } + if len(result) == 0 { + log.Warn("apollo get notify result empty") + return nil + } + var n []*notification + if err := json.Unmarshal(result, &n); err != nil { + return err + } + for _, v := range n { + if err := p.handler(v); err != nil { + return err + } + p.notification.Store(v.NamespaceName, v.NotificationID) + } + return nil +} diff --git a/apollo/storage.go b/apollo/storage.go new file mode 100644 index 0000000..84fc53a --- /dev/null +++ b/apollo/storage.go @@ -0,0 +1,45 @@ +package apollo + +import ( + "sync" +) + +// storage namespace cache +type storage struct { + caches sync.Map +} + +func newStorage(namespaceNames []string) *storage { + s := &storage{ + caches: sync.Map{}, + } + for _, namespace := range namespaceNames { + s.caches.Store(namespace, &cache{ + data: sync.Map{}, + }) + } + return s +} + +func (s *storage) loadCache(namespace string) *cache { + if value, ok := s.caches.Load(namespace); ok { + return value.(*cache) + } + c := &cache{ + data: sync.Map{}, + } + s.caches.Store(namespace, c) + return c +} + +// apolloConfiguration query config result +type apolloConfiguration struct { + NamespaceName string `json:"namespaceName"` + Configurations map[string]string `json:"configurations"` + ReleaseKey string `json:"releaseKey"` +} + +// cache apollo namespace configuration cache +type cache struct { + data sync.Map +} diff --git a/apollo/utils.go b/apollo/utils.go new file mode 100644 index 0000000..6c2438f --- /dev/null +++ b/apollo/utils.go @@ -0,0 +1,23 @@ +package apollo + +import "net" + +func GetLocalIP() string { + ips, err := net.InterfaceAddrs() + if err != nil { + return "" + } + for _, ip := range ips { + if ip4 := toIP4(ip); ip4 != nil { + return ip4.String() + } + } + return "" +} + +func toIP4(addr net.Addr) net.IP { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + return ipNet.IP.To4() + } + return nil +}