diff --git a/.golangci.yml b/.golangci.yml index 7422131b17..b1c29bafc9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -23,3 +23,11 @@ linters-settings: govet: disable: - unsafeptr + goimports: + local-prefixes: github.com/apache/apisix-ingress-controller + +linters: + enable: + - goimports + - govet + - gofmt diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go index 55733de04b..84f152d7bd 100644 --- a/pkg/apisix/apisix.go +++ b/pkg/apisix/apisix.go @@ -12,6 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package apisix import ( @@ -26,9 +27,9 @@ type APISIX interface { // Cluster specifies the target cluster to talk. Cluster(string) Cluster // AddCluster adds a new cluster. - AddCluster(*ClusterOptions) error + AddCluster(context.Context, *ClusterOptions) error // UpdateCluster updates an existing cluster. - UpdateCluster(*ClusterOptions) error + UpdateCluster(context.Context, *ClusterOptions) error // ListClusters lists all APISIX clusters. ListClusters() []Cluster } @@ -54,6 +55,10 @@ type Cluster interface { Consumer() Consumer // HealthCheck checks apisix cluster health in realtime. HealthCheck(context.Context) error + // Plugin returns a Plugin interface that can operate Plugin resources. + Plugin() Plugin + // Schema returns a Schema interface that can fetch schema of APISIX objects. + Schema() Schema } // Route is the specific client interface to take over the create, update, @@ -106,7 +111,7 @@ type GlobalRule interface { Update(context.Context, *v1.GlobalRule) (*v1.GlobalRule, error) } -// Consumer it the specific client interface to take over the create, update, +// Consumer is the specific client interface to take over the create, update, // list and delete for APISIX Consumer resource. type Consumer interface { Get(context.Context, string) (*v1.Consumer, error) @@ -116,6 +121,16 @@ type Consumer interface { Update(context.Context, *v1.Consumer) (*v1.Consumer, error) } +// Plugin is the specific client interface to fetch APISIX Plugin resource. +type Plugin interface { + List(context.Context) ([]string, error) +} + +// Schema is the specific client interface to fetch the schema of APISIX objects. +type Schema interface { + GetPluginSchema(context.Context, string) (*v1.Schema, error) +} + type apisix struct { mu sync.RWMutex nonExistentCluster Cluster @@ -154,14 +169,14 @@ func (c *apisix) ListClusters() []Cluster { } // AddCluster implements APISIX.AddCluster method. -func (c *apisix) AddCluster(co *ClusterOptions) error { +func (c *apisix) AddCluster(ctx context.Context, co *ClusterOptions) error { c.mu.Lock() defer c.mu.Unlock() _, ok := c.clusters[co.Name] if ok { return ErrDuplicatedCluster } - cluster, err := newCluster(co) + cluster, err := newCluster(ctx, co) if err != nil { return err } @@ -169,14 +184,14 @@ func (c *apisix) AddCluster(co *ClusterOptions) error { return nil } -func (c *apisix) UpdateCluster(co *ClusterOptions) error { +func (c *apisix) UpdateCluster(ctx context.Context, co *ClusterOptions) error { c.mu.Lock() defer c.mu.Unlock() if _, ok := c.clusters[co.Name]; !ok { return ErrClusterNotExist } - cluster, err := newCluster(co) + cluster, err := newCluster(ctx, co) if err != nil { return err } diff --git a/pkg/apisix/cache/cache.go b/pkg/apisix/cache/cache.go index 48fc93b081..e14857fed1 100644 --- a/pkg/apisix/cache/cache.go +++ b/pkg/apisix/cache/cache.go @@ -35,6 +35,8 @@ type Cache interface { InsertGlobalRule(*v1.GlobalRule) error // InsertConsumer adds or updates consumer to cache. InsertConsumer(*v1.Consumer) error + // InsertSchema adds or updates schema to cache. + InsertSchema(*v1.Schema) error // GetRoute finds the route from cache according to the primary index (id). GetRoute(string) (*v1.Route, error) @@ -48,6 +50,8 @@ type Cache interface { GetGlobalRule(string) (*v1.GlobalRule, error) // GetConsumer finds the consumer from cache according to the primary index (id). GetConsumer(string) (*v1.Consumer, error) + // GetSchema finds the scheme from cache according to the primary index (id). + GetSchema(string) (*v1.Schema, error) // ListRoutes lists all routes in cache. ListRoutes() ([]*v1.Route, error) @@ -61,6 +65,8 @@ type Cache interface { ListGlobalRules() ([]*v1.GlobalRule, error) // ListConsumers lists all consumer objects in cache. ListConsumers() ([]*v1.Consumer, error) + // ListSchema lists all schema in cache. + ListSchema() ([]*v1.Schema, error) // DeleteRoute deletes the specified route in cache. DeleteRoute(*v1.Route) error @@ -74,4 +80,6 @@ type Cache interface { DeleteGlobalRule(*v1.GlobalRule) error // DeleteConsumer deletes the specified consumer in cache. DeleteConsumer(*v1.Consumer) error + // DeleteSchema deletes the specified schema in cache. + DeleteSchema(*v1.Schema) error } diff --git a/pkg/apisix/cache/memdb.go b/pkg/apisix/cache/memdb.go index 4502034ced..17f9cd38fa 100644 --- a/pkg/apisix/cache/memdb.go +++ b/pkg/apisix/cache/memdb.go @@ -70,6 +70,10 @@ func (c *dbCache) InsertConsumer(consumer *v1.Consumer) error { return c.insert("consumer", consumer.DeepCopy()) } +func (c *dbCache) InsertSchema(schema *v1.Schema) error { + return c.insert("schema", schema.DeepCopy()) +} + func (c *dbCache) insert(table string, obj interface{}) error { txn := c.db.Txn(true) defer txn.Abort() @@ -128,6 +132,14 @@ func (c *dbCache) GetConsumer(username string) (*v1.Consumer, error) { return obj.(*v1.Consumer).DeepCopy(), nil } +func (c *dbCache) GetSchema(name string) (*v1.Schema, error) { + obj, err := c.get("schema", name) + if err != nil { + return nil, err + } + return obj.(*v1.Schema).DeepCopy(), nil +} + func (c *dbCache) get(table, id string) (interface{}, error) { txn := c.db.Txn(false) defer txn.Abort() @@ -216,6 +228,18 @@ func (c *dbCache) ListConsumers() ([]*v1.Consumer, error) { return consumers, nil } +func (c *dbCache) ListSchema() ([]*v1.Schema, error) { + raws, err := c.list("schema") + if err != nil { + return nil, err + } + schemaList := make([]*v1.Schema, 0, len(raws)) + for _, raw := range raws { + schemaList = append(schemaList, raw.(*v1.Schema).DeepCopy()) + } + return schemaList, nil +} + func (c *dbCache) list(table string) ([]interface{}, error) { txn := c.db.Txn(false) defer txn.Abort() @@ -257,6 +281,10 @@ func (c *dbCache) DeleteConsumer(consumer *v1.Consumer) error { return c.delete("consumer", consumer) } +func (c *dbCache) DeleteSchema(schema *v1.Schema) error { + return c.delete("schema", schema) +} + func (c *dbCache) delete(table string, obj interface{}) error { txn := c.db.Txn(true) defer txn.Abort() diff --git a/pkg/apisix/cache/memdb_test.go b/pkg/apisix/cache/memdb_test.go index 46d7816662..ff06c9adda 100644 --- a/pkg/apisix/cache/memdb_test.go +++ b/pkg/apisix/cache/memdb_test.go @@ -345,3 +345,47 @@ func TestMemDBCacheConsumer(t *testing.T) { } assert.Error(t, ErrNotFound, c.DeleteConsumer(c4)) } + +func TestMemDBCacheSchema(t *testing.T) { + c, err := NewMemDBCache() + assert.Nil(t, err, "NewMemDBCache") + + s1 := &v1.Schema{ + Name: "plugins/p1", + Content: "plugin schema", + } + assert.Nil(t, c.InsertSchema(s1), "inserting schema s1") + + s11, err := c.GetSchema("plugins/p1") + assert.Nil(t, err) + assert.Equal(t, s1, s11) + + s2 := &v1.Schema{ + Name: "plugins/p2", + } + s3 := &v1.Schema{ + Name: "plugins/p3", + } + assert.Nil(t, c.InsertSchema(s2), "inserting schema s2") + assert.Nil(t, c.InsertSchema(s3), "inserting schema s3") + + s22, err := c.GetSchema("plugins/p2") + assert.Nil(t, err) + assert.Equal(t, s2, s22) + + assert.Nil(t, c.DeleteSchema(s3), "delete schema s3") + + schemaList, err := c.ListSchema() + assert.Nil(t, err, "listing schema") + + if schemaList[0].Name > schemaList[1].Name { + schemaList[0], schemaList[1] = schemaList[1], schemaList[0] + } + assert.Equal(t, schemaList[0], s1) + assert.Equal(t, schemaList[1], s2) + + s4 := &v1.Schema{ + Name: "plugins/p4", + } + assert.Error(t, ErrNotFound, c.DeleteSchema(s4)) +} diff --git a/pkg/apisix/cache/schema.go b/pkg/apisix/cache/schema.go index 9925d39d7e..6b9e8ba21c 100644 --- a/pkg/apisix/cache/schema.go +++ b/pkg/apisix/cache/schema.go @@ -106,6 +106,16 @@ var ( }, }, }, + "schema": { + Name: "schema", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Name"}, + }, + }, + }, }, } ) diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go index baad0273e6..34e4492b93 100644 --- a/pkg/apisix/cluster.go +++ b/pkg/apisix/cluster.go @@ -12,6 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package apisix import ( @@ -34,10 +35,12 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/types" ) const ( - _defaultTimeout = 5 * time.Second + _defaultTimeout = 5 * time.Second + _defaultSyncInterval = 6 * time.Hour _cacheSyncing = iota _cacheSynced @@ -72,6 +75,8 @@ type ClusterOptions struct { AdminKey string BaseURL string Timeout time.Duration + // SyncInterval is the interval to sync schema. + SyncInterval types.TimeDuration } type cluster struct { @@ -90,15 +95,20 @@ type cluster struct { streamRoute StreamRoute globalRules GlobalRule consumer Consumer + plugin Plugin + schema Schema } -func newCluster(o *ClusterOptions) (Cluster, error) { +func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) { if o.BaseURL == "" { return nil, errors.New("empty base url") } if o.Timeout == time.Duration(0) { o.Timeout = _defaultTimeout } + if o.SyncInterval.Duration == time.Duration(0) { + o.SyncInterval = types.TimeDuration{Duration: _defaultSyncInterval} + } o.BaseURL = strings.TrimSuffix(o.BaseURL, "/") u, err := url.Parse(o.BaseURL) @@ -124,18 +134,21 @@ func newCluster(o *ClusterOptions) (Cluster, error) { c.streamRoute = newStreamRouteClient(c) c.globalRules = newGlobalRuleClient(c) c.consumer = newConsumerClient(c) + c.plugin = newPluginClient(c) + c.schema = newSchemaClient(c) c.cache, err = cache.NewMemDBCache() if err != nil { return nil, err } - go c.syncCache() + go c.syncCache(ctx) + go c.syncSchema(ctx, o.SyncInterval.Duration) return c, nil } -func (c *cluster) syncCache() { +func (c *cluster) syncCache(ctx context.Context) { log.Infow("syncing cache", zap.String("cluster", c.name)) now := time.Now() defer func() { @@ -161,7 +174,7 @@ func (c *cluster) syncCache() { err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) { // impossibly return: false, nil // so can safe used - done, lastSyncErr = c.syncCacheOnce() + done, lastSyncErr = c.syncCacheOnce(ctx) return }) if err != nil { @@ -175,33 +188,33 @@ func (c *cluster) syncCache() { } } -func (c *cluster) syncCacheOnce() (bool, error) { - routes, err := c.route.List(context.TODO()) +func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) { + routes, err := c.route.List(ctx) if err != nil { log.Errorf("failed to list route in APISIX: %s", err) return false, err } - upstreams, err := c.upstream.List(context.TODO()) + upstreams, err := c.upstream.List(ctx) if err != nil { log.Errorf("failed to list upstreams in APISIX: %s", err) return false, err } - ssl, err := c.ssl.List(context.TODO()) + ssl, err := c.ssl.List(ctx) if err != nil { log.Errorf("failed to list ssl in APISIX: %s", err) return false, err } - streamRoutes, err := c.streamRoute.List(context.TODO()) + streamRoutes, err := c.streamRoute.List(ctx) if err != nil { log.Errorf("failed to list stream_routes in APISIX: %s", err) return false, err } - globalRules, err := c.globalRules.List(context.TODO()) + globalRules, err := c.globalRules.List(ctx) if err != nil { log.Errorf("failed to list global_rules in APISIX: %s", err) return false, err } - consumers, err := c.consumer.List(context.TODO()) + consumers, err := c.consumer.List(ctx) if err != nil { log.Errorf("failed to list consumers in APISIX: %s", err) return false, err @@ -301,6 +314,74 @@ func (c *cluster) HasSynced(ctx context.Context) error { } } +// syncSchema syncs schema from APISIX regularly according to the interval. +func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + if err := c.syncSchemaOnce(ctx); err != nil { + log.Warnf("failed to sync schema: %s", err) + } + + select { + case <-ticker.C: + continue + case <-ctx.Done(): + return + } + } +} + +// syncSchemaOnce syncs schema from APISIX once. +// It firstly deletes all the schema in the cache, +// then queries and inserts to the cache. +func (c *cluster) syncSchemaOnce(ctx context.Context) error { + log.Infow("syncing schema", zap.String("cluster", c.name)) + + schemaList, err := c.cache.ListSchema() + if err != nil { + log.Errorf("failed to list schema in the cache: %s", err) + return err + } + for _, s := range schemaList { + if err := c.cache.DeleteSchema(s); err != nil { + log.Warnw("failed to delete schema in cache", + zap.String("schemaName", s.Name), + zap.String("schemaContent", s.Content), + zap.String("error", err.Error()), + ) + } + } + + // update plugins' schema. + pluginList, err := c.plugin.List(ctx) + if err != nil { + log.Errorf("failed to list plugin names in APISIX: %s", err) + return err + } + for _, p := range pluginList { + ps, err := c.schema.GetPluginSchema(ctx, p) + if err != nil { + log.Warnw("failed to get plugin schema", + zap.String("plugin", p), + zap.String("error", err.Error()), + ) + continue + } + + if err := c.cache.InsertSchema(ps); err != nil { + log.Warnw("failed to insert schema to cache", + zap.String("plugin", p), + zap.String("cluster", c.name), + zap.String("error", err.Error()), + ) + continue + } + } + return nil +} + // Route implements Cluster.Route method. func (c *cluster) Route() Route { return c.route @@ -331,6 +412,16 @@ func (c *cluster) Consumer() Consumer { return c.consumer } +// Plugin implements Cluster.Plugin method. +func (c *cluster) Plugin() Plugin { + return c.plugin +} + +// Schema implements Cluster.Schema method. +func (c *cluster) Schema() Schema { + return c.schema +} + // HealthCheck implements Cluster.HealthCheck method. func (c *cluster) HealthCheck(ctx context.Context) (err error) { if c.cacheSyncErr != nil { @@ -550,3 +641,56 @@ func readBody(r io.ReadCloser, url string) string { } return string(data) } + +// getSchema returns the schema of APISIX object. +func (c *cluster) getSchema(ctx context.Context, url string) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return "", err + } + resp, err := c.do(req) + if err != nil { + return "", err + } + defer drainBody(resp.Body, url) + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return "", cache.ErrNotFound + } else { + err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) + err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) + } + return "", err + } + + return readBody(resp.Body, url), nil +} + +// getList returns a list of string. +func (c *cluster) getList(ctx context.Context, url string) ([]string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := c.do(req) + if err != nil { + return nil, err + } + defer drainBody(resp.Body, url) + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return nil, cache.ErrNotFound + } else { + err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) + err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) + } + return nil, err + } + + var listResponse []string + dec := json.NewDecoder(resp.Body) + if err := dec.Decode(&listResponse); err != nil { + return nil, err + } + return listResponse, nil +} diff --git a/pkg/apisix/cluster_test.go b/pkg/apisix/cluster_test.go index 523b62e6cd..9f31e3e5f6 100644 --- a/pkg/apisix/cluster_test.go +++ b/pkg/apisix/cluster_test.go @@ -19,15 +19,16 @@ import ( "context" "testing" - v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" "github.com/stretchr/testify/assert" + + v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) func TestAddCluster(t *testing.T) { apisix, err := NewClient() assert.Nil(t, err) - err = apisix.AddCluster(&ClusterOptions{ + err = apisix.AddCluster(context.Background(), &ClusterOptions{ BaseURL: "http://service1:9080/apisix/admin", }) assert.Nil(t, err) @@ -35,13 +36,13 @@ func TestAddCluster(t *testing.T) { clusters := apisix.ListClusters() assert.Len(t, clusters, 1) - err = apisix.AddCluster(&ClusterOptions{ + err = apisix.AddCluster(context.Background(), &ClusterOptions{ Name: "service2", BaseURL: "http://service2:9080/apisix/admin", }) assert.Nil(t, err) - err = apisix.AddCluster(&ClusterOptions{ + err = apisix.AddCluster(context.Background(), &ClusterOptions{ Name: "service2", AdminKey: "http://service3:9080/apisix/admin", }) @@ -55,7 +56,7 @@ func TestNonExistentCluster(t *testing.T) { apisix, err := NewClient() assert.Nil(t, err) - err = apisix.AddCluster(&ClusterOptions{ + err = apisix.AddCluster(context.Background(), &ClusterOptions{ BaseURL: "http://service1:9080/apisix/admin", }) assert.Nil(t, err) diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go index 160a571e24..d0d1d4d669 100644 --- a/pkg/apisix/nonexistentclient.go +++ b/pkg/apisix/nonexistentclient.go @@ -35,6 +35,8 @@ func newNonExistentCluster() *nonExistentCluster { streamRoute: &dummyStreamRoute{}, globalRule: &dummyGlobalRule{}, consumer: &dummyConsumer{}, + plugin: &dummyPlugin{}, + schema: &dummySchema{}, }, } } @@ -46,6 +48,8 @@ type embedDummyResourceImplementer struct { streamRoute StreamRoute globalRule GlobalRule consumer Consumer + plugin Plugin + schema Schema } type dummyRoute struct{} @@ -180,6 +184,18 @@ func (f *dummyConsumer) Update(_ context.Context, _ *v1.Consumer) (*v1.Consumer, return nil, ErrClusterNotExist } +type dummyPlugin struct{} + +func (f *dummyPlugin) List(_ context.Context) ([]string, error) { + return nil, ErrClusterNotExist +} + +type dummySchema struct{} + +func (f *dummySchema) GetPluginSchema(_ context.Context, _ string) (*v1.Schema, error) { + return nil, ErrClusterNotExist +} + func (nc *nonExistentCluster) Route() Route { return nc.route } @@ -204,6 +220,14 @@ func (nc *nonExistentCluster) Consumer() Consumer { return nc.consumer } +func (nc *nonExistentCluster) Plugin() Plugin { + return nc.plugin +} + +func (nc *nonExistentCluster) Schema() Schema { + return nc.schema +} + func (nc *nonExistentCluster) HasSynced(_ context.Context) error { return nil } @@ -226,21 +250,25 @@ func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error { return func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error { return nil } func (c *dummyCache) InsertGlobalRule(_ *v1.GlobalRule) error { return nil } func (c *dummyCache) InsertConsumer(_ *v1.Consumer) error { return nil } +func (c *dummyCache) InsertSchema(_ *v1.Schema) error { return nil } func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetGlobalRule(_ string) (*v1.GlobalRule, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetConsumer(_ string) (*v1.Consumer, error) { return nil, cache.ErrNotFound } +func (c *dummyCache) GetSchema(_ string) (*v1.Schema, error) { return nil, cache.ErrNotFound } func (c *dummyCache) ListRoutes() ([]*v1.Route, error) { return nil, nil } func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) { return nil, nil } func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) { return nil, nil } func (c *dummyCache) ListStreamRoutes() ([]*v1.StreamRoute, error) { return nil, nil } func (c *dummyCache) ListGlobalRules() ([]*v1.GlobalRule, error) { return nil, nil } func (c *dummyCache) ListConsumers() ([]*v1.Consumer, error) { return nil, nil } +func (c *dummyCache) ListSchema() ([]*v1.Schema, error) { return nil, nil } func (c *dummyCache) DeleteRoute(_ *v1.Route) error { return nil } func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error { return nil } func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return nil } func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error { return nil } func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error { return nil } func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error { return nil } +func (c *dummyCache) DeleteSchema(_ *v1.Schema) error { return nil } diff --git a/pkg/apisix/plugin.go b/pkg/apisix/plugin.go new file mode 100644 index 0000000000..03acce874f --- /dev/null +++ b/pkg/apisix/plugin.go @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apisix + +import ( + "context" + + "go.uber.org/zap" + + "github.com/apache/apisix-ingress-controller/pkg/log" +) + +type pluginClient struct { + url string + cluster *cluster +} + +func newPluginClient(c *cluster) Plugin { + return &pluginClient{ + url: c.baseURL + "/plugins", + cluster: c, + } +} + +// List returns the names of all plugins. +func (p *pluginClient) List(ctx context.Context) ([]string, error) { + log.Debugw("try to list plugins' names in APISIX", + zap.String("cluster", "default"), + zap.String("url", p.url), + ) + pluginList, err := p.cluster.getList(ctx, p.url+"/list") + if err != nil { + log.Errorf("failed to list plugins' names: %s", err) + return nil, err + } + log.Debugf("plugin list: %v", pluginList) + return pluginList, nil +} diff --git a/pkg/apisix/plugin_test.go b/pkg/apisix/plugin_test.go new file mode 100644 index 0000000000..3ee6c71cbe --- /dev/null +++ b/pkg/apisix/plugin_test.go @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package apisix + +import ( + "context" + "encoding/json" + "net/http" + "net/url" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "golang.org/x/net/nettest" +) + +type fakeAPISIXPluginSrv struct { + plugins []string +} + +var fakePluginNames = []string{ + "plugin-1", + "plugin-2", + "plugin-3", +} + +func (srv *fakeAPISIXPluginSrv) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + if !strings.HasPrefix(r.URL.Path, "/apisix/admin/plugins") { + w.WriteHeader(http.StatusNotFound) + return + } + + if r.Method == http.MethodGet { + data, _ := json.Marshal(srv.plugins) + _, _ = w.Write(data) + w.WriteHeader(http.StatusOK) + return + } +} + +func runFakePluginSrv(t *testing.T) *http.Server { + srv := &fakeAPISIXPluginSrv{ + plugins: fakePluginNames, + } + + ln, _ := nettest.NewLocalListener("tcp") + + httpSrv := &http.Server{ + Addr: ln.Addr().String(), + Handler: srv, + } + + go func() { + if err := httpSrv.Serve(ln); err != nil && err != http.ErrServerClosed { + t.Errorf("failed to run http server: %s", err) + } + }() + + return httpSrv +} + +func TestPluginClient(t *testing.T) { + srv := runFakePluginSrv(t) + defer func() { + assert.Nil(t, srv.Shutdown(context.Background())) + }() + + u := url.URL{ + Scheme: "http", + Host: srv.Addr, + Path: "/apisix/admin", + } + + closedCh := make(chan struct{}) + close(closedCh) + cli := newPluginClient(&cluster{ + baseURL: u.String(), + cli: http.DefaultClient, + cache: &dummyCache{}, + cacheSynced: closedCh, + }) + + // List + objs, err := cli.List(context.Background()) + assert.Nil(t, err) + assert.Len(t, objs, len(fakePluginNames)) + for i := range fakePluginNames { + assert.Equal(t, objs[i], fakePluginNames[i]) + } +} diff --git a/pkg/apisix/resource_test.go b/pkg/apisix/resource_test.go index 6723727e78..c640b971ae 100644 --- a/pkg/apisix/resource_test.go +++ b/pkg/apisix/resource_test.go @@ -19,6 +19,7 @@ import ( "testing" v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" + "github.com/stretchr/testify/assert" ) diff --git a/pkg/apisix/schema.go b/pkg/apisix/schema.go new file mode 100644 index 0000000000..1727124083 --- /dev/null +++ b/pkg/apisix/schema.go @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apisix + +import ( + "context" + + "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" + "github.com/apache/apisix-ingress-controller/pkg/id" + "github.com/apache/apisix-ingress-controller/pkg/log" + v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" + + "go.uber.org/zap" +) + +type schemaClient struct { + url string + cluster *cluster +} + +func newSchemaClient(c *cluster) Schema { + return &schemaClient{ + url: c.baseURL + "/schema/", + cluster: c, + } +} + +// GetSchema returns APISIX object's schema. +func (sc schemaClient) getSchema(ctx context.Context, name string) (*v1.Schema, error) { + log.Debugw("try to look up schema", + zap.String("name", name), + zap.String("url", sc.url), + zap.String("cluster", "default"), + ) + + sid := id.GenID(name) + schema, err := sc.cluster.cache.GetSchema(sid) + if err == nil { + return schema, nil + } + if err == cache.ErrNotFound { + log.Debugw("failed to find schema in cache, will try to lookup from APISIX", + zap.String("name", name), + zap.Error(err), + ) + } else { + log.Errorw("failed to find schema in cache, will try to lookup from APISIX", + zap.String("name", name), + zap.Error(err), + ) + } + + url := sc.url + "/" + name + content, err := sc.cluster.getSchema(ctx, url) + if err != nil { + log.Errorw("failed to get schema from APISIX", + zap.String("name", name), + zap.String("url", url), + zap.String("cluster", "default"), + zap.Error(err), + ) + return nil, err + } + + schema = &v1.Schema{ + Name: name, + Content: content, + } + if err := sc.cluster.cache.InsertSchema(schema); err != nil { + log.Errorf("failed to reflect schema create to cache: %s", err) + return nil, err + } + return schema, nil +} + +// GetPluginSchema returns plugin's schema. +func (sc schemaClient) GetPluginSchema(ctx context.Context, pluginName string) (*v1.Schema, error) { + return sc.getSchema(ctx, "plugins/"+pluginName) +} diff --git a/pkg/apisix/schema_test.go b/pkg/apisix/schema_test.go new file mode 100644 index 0000000000..cc782fbb9e --- /dev/null +++ b/pkg/apisix/schema_test.go @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package apisix + +import ( + "context" + "net/http" + "net/url" + "strings" + "testing" + + "golang.org/x/net/nettest" + + "github.com/stretchr/testify/assert" +) + +type fakeAPISIXSchemaSrv struct { + schema map[string]string +} + +var testData = map[string]string{ + // plugins' schema + "plugins/key-auth": `{"$comment":"this is a mark for our injected plugin schema","type":"object","additionalProperties":false,"properties":{"disable":{"type":"boolean"},"header":{"default":"apikey","type":"string"}}}`, + "plugins/batch-requests": `{"$comment":"this is a mark for our injected plugin schema","type":"object","additionalProperties":false,"properties":{"disable":{"type":"boolean"}}}`, + "plugins/ext-plugin-pre-req": `{"properties":{"disable":{"type":"boolean"},"extra_info":{"items":{"type":"string","minLength":1,"maxLength":64},"minItems":1,"type":"array"},"conf":{"items":{"properties":{"value":{"type":"string"},"name":{"type":"string","minLength":1,"maxLength":128}},"type":"object"},"minItems":1,"type":"array"}},"$comment":"this is a mark for our injected plugin schema","type":"object"}`, +} + +const errMsg = `{"error_msg":"not found schema"}` + +func (srv *fakeAPISIXSchemaSrv) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + if !strings.HasPrefix(r.URL.Path, "/apisix/admin/schema") { + w.WriteHeader(http.StatusNotFound) + return + } + + if r.Method == http.MethodGet { + name := strings.Trim(strings.TrimPrefix(r.URL.Path, "/apisix/admin/schema/"), "/") + if len(name) < 1 { + w.WriteHeader(http.StatusBadRequest) + return + } + + if resp, ok := srv.schema[name]; ok { + _, _ = w.Write([]byte(resp)) + } else { + _, _ = w.Write([]byte(errMsg)) + } + w.WriteHeader(http.StatusOK) + return + } + +} + +func runFakeSchemaSrv(t *testing.T) *http.Server { + srv := &fakeAPISIXSchemaSrv{ + schema: testData, + } + + ln, _ := nettest.NewLocalListener("tcp") + + httpSrv := &http.Server{ + Addr: ln.Addr().String(), + Handler: srv, + } + + go func() { + if err := httpSrv.Serve(ln); err != nil && err != http.ErrServerClosed { + t.Errorf("failed to run http server: %s", err) + } + }() + + return httpSrv +} + +func TestSchemaClient(t *testing.T) { + srv := runFakeSchemaSrv(t) + defer func() { + assert.Nil(t, srv.Shutdown(context.Background())) + }() + + u := url.URL{ + Scheme: "http", + Host: srv.Addr, + Path: "/apisix/admin", + } + + closedCh := make(chan struct{}) + close(closedCh) + cli := newSchemaClient(&cluster{ + baseURL: u.String(), + cli: http.DefaultClient, + cache: &dummyCache{}, + cacheSynced: closedCh, + }) + + // Test `GetPluginSchema`. + for name := range testData { + list := strings.Split(name, "/") + assert.Greater(t, len(list), 0) + + schema, err := cli.GetPluginSchema(context.Background(), list[len(list)-1]) + assert.Nil(t, err) + assert.Equal(t, schema.Name, name) + assert.Equal(t, schema.Content, testData[name]) + } + + // Test getting a non-existent plugin's schema. + schema, err := cli.GetPluginSchema(context.Background(), "not-a-plugin") + assert.Nil(t, err) + assert.Equal(t, schema.Content, errMsg) +} diff --git a/pkg/apisix/ssl_test.go b/pkg/apisix/ssl_test.go index 4f8585c1d1..41602a74e2 100644 --- a/pkg/apisix/ssl_test.go +++ b/pkg/apisix/ssl_test.go @@ -28,6 +28,7 @@ import ( "testing" v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" + "github.com/stretchr/testify/assert" "golang.org/x/net/nettest" ) diff --git a/pkg/apisix/stream_route_test.go b/pkg/apisix/stream_route_test.go index 35411caad7..778131b51b 100644 --- a/pkg/apisix/stream_route_test.go +++ b/pkg/apisix/stream_route_test.go @@ -27,6 +27,7 @@ import ( "testing" v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" + "github.com/stretchr/testify/assert" "golang.org/x/net/nettest" ) diff --git a/pkg/ingress/apisix_cluster_config.go b/pkg/ingress/apisix_cluster_config.go index 9af9f3df58..835203b68f 100644 --- a/pkg/ingress/apisix_cluster_config.go +++ b/pkg/ingress/apisix_cluster_config.go @@ -136,7 +136,7 @@ func (c *apisixClusterConfigController) sync(ctx context.Context, ev *types.Even ) // TODO we may first call AddCluster. // Since now we already have the default cluster, we just call UpdateCluster. - if err := c.controller.apisix.UpdateCluster(clusterOpts); err != nil { + if err := c.controller.apisix.UpdateCluster(ctx, clusterOpts); err != nil { log.Errorw("failed to update cluster", zap.String("cluster_name", acc.Name), zap.Error(err), diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go index 989725696e..4d59b52dc1 100644 --- a/pkg/ingress/controller.go +++ b/pkg/ingress/controller.go @@ -21,8 +21,6 @@ import ( "sync" "time" - apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" - configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" "go.uber.org/zap" v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -39,8 +37,10 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/api" "github.com/apache/apisix-ingress-controller/pkg/apisix" + apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" "github.com/apache/apisix-ingress-controller/pkg/config" "github.com/apache/apisix-ingress-controller/pkg/kube" + configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" apisixscheme "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme" listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1" listersv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2alpha1" @@ -379,7 +379,7 @@ func (c *Controller) run(ctx context.Context) { AdminKey: c.cfg.APISIX.DefaultClusterAdminKey, BaseURL: c.cfg.APISIX.DefaultClusterBaseURL, } - err := c.apisix.AddCluster(clusterOpts) + err := c.apisix.AddCluster(ctx, clusterOpts) if err != nil && err != apisix.ErrDuplicatedCluster { // TODO give up the leader role log.Errorf("failed to add default cluster: %s", err) @@ -391,7 +391,7 @@ func (c *Controller) run(ctx context.Context) { log.Errorf("failed to wait the default cluster to be ready: %s", err) // re-create apisix cluster, used in next c.run - if err = c.apisix.UpdateCluster(clusterOpts); err != nil { + if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil { log.Errorf("failed to update default cluster: %s", err) return } diff --git a/pkg/kube/translation/ingress_test.go b/pkg/kube/translation/ingress_test.go index 57d119b7ff..feb0026b1c 100644 --- a/pkg/kube/translation/ingress_test.go +++ b/pkg/kube/translation/ingress_test.go @@ -16,21 +16,20 @@ package translation import ( "context" - "github.com/apache/apisix-ingress-controller/pkg/kube" "testing" - extensionsv1beta1 "k8s.io/api/extensions/v1beta1" - networkingv1beta1 "k8s.io/api/networking/v1beta1" - "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" networkingv1 "k8s.io/api/networking/v1" + networkingv1beta1 "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" + "github.com/apache/apisix-ingress-controller/pkg/kube" fakeapisix "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/fake" apisixinformers "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions" ) diff --git a/pkg/kube/translation/plugin_test.go b/pkg/kube/translation/plugin_test.go index 2cfa206340..9a2642cb04 100644 --- a/pkg/kube/translation/plugin_test.go +++ b/pkg/kube/translation/plugin_test.go @@ -16,7 +16,6 @@ package translation import ( "context" - "github.com/apache/apisix-ingress-controller/pkg/kube" "testing" "github.com/stretchr/testify/assert" @@ -28,6 +27,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/apache/apisix-ingress-controller/pkg/id" + "github.com/apache/apisix-ingress-controller/pkg/kube" configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" apisixfake "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/fake" apisixinformers "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions" diff --git a/pkg/kube/translation/translator_test.go b/pkg/kube/translation/translator_test.go index f744403feb..19fe3cf5fd 100644 --- a/pkg/kube/translation/translator_test.go +++ b/pkg/kube/translation/translator_test.go @@ -18,19 +18,18 @@ import ( "context" "testing" - "github.com/apache/apisix-ingress-controller/pkg/kube" - discoveryv1 "k8s.io/api/discovery/v1" - - apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" + "github.com/apache/apisix-ingress-controller/pkg/kube" configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" + apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) func TestTranslateUpstreamConfig(t *testing.T) { diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go index 9c273339c0..2858d53a3d 100644 --- a/pkg/types/apisix/v1/types.go +++ b/pkg/types/apisix/v1/types.go @@ -463,3 +463,23 @@ func ComposeConsumerName(namespace, name string) string { return buf.String() } + +// Schema represents the schema of APISIX objects. +type Schema struct { + Name string `json:"name,omitempty" yaml:"name,omitempty"` + Content string `json:"content,omitempty" yaml:"content,omitempty"` +} + +func (s *Schema) DeepCopyInto(out *Schema) { + b, _ := json.Marshal(&s) + _ = json.Unmarshal(b, out) +} + +func (s *Schema) DeepCopy() *Schema { + if s == nil { + return nil + } + out := new(Schema) + s.DeepCopyInto(out) + return out +} diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go index 941ec41ea9..cbbe869e0c 100644 --- a/test/e2e/scaffold/k8s.go +++ b/test/e2e/scaffold/k8s.go @@ -238,7 +238,7 @@ func (s *Scaffold) ListApisixUpstreams() ([]*v1.Upstream, error) { if err != nil { return nil, err } - err = cli.AddCluster(&apisix.ClusterOptions{ + err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{ BaseURL: u.String(), AdminKey: s.opts.APISIXAdminAPIKey, }) @@ -259,7 +259,7 @@ func (s *Scaffold) ListApisixGlobalRules() ([]*v1.GlobalRule, error) { if err != nil { return nil, err } - err = cli.AddCluster(&apisix.ClusterOptions{ + err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{ BaseURL: u.String(), AdminKey: s.opts.APISIXAdminAPIKey, }) @@ -280,7 +280,7 @@ func (s *Scaffold) ListApisixRoutes() ([]*v1.Route, error) { if err != nil { return nil, err } - err = cli.AddCluster(&apisix.ClusterOptions{ + err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{ BaseURL: u.String(), AdminKey: s.opts.APISIXAdminAPIKey, }) @@ -301,7 +301,7 @@ func (s *Scaffold) ListApisixConsumers() ([]*v1.Consumer, error) { if err != nil { return nil, err } - err = cli.AddCluster(&apisix.ClusterOptions{ + err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{ BaseURL: u.String(), AdminKey: s.opts.APISIXAdminAPIKey, }) @@ -322,7 +322,7 @@ func (s *Scaffold) ListApisixStreamRoutes() ([]*v1.StreamRoute, error) { if err != nil { return nil, err } - err = cli.AddCluster(&apisix.ClusterOptions{ + err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{ BaseURL: u.String(), AdminKey: s.opts.APISIXAdminAPIKey, }) @@ -343,7 +343,7 @@ func (s *Scaffold) ListApisixSsl() ([]*v1.Ssl, error) { if err != nil { return nil, err } - err = cli.AddCluster(&apisix.ClusterOptions{ + err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{ BaseURL: u.String(), AdminKey: s.opts.APISIXAdminAPIKey, })