Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tharindu1st committed Feb 6, 2024
1 parent 60d4d73 commit 651210d
Show file tree
Hide file tree
Showing 10 changed files with 590 additions and 64 deletions.
1 change: 1 addition & 0 deletions common-controller/internal/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var defaultConfig = &Config{
Enabled: false,
Host: "localhost",
EventPort: 18000,
RestPort: 18001,
RetryInterval: 5,
Persistence: persistence{Type: "K8s"}},
},
Expand Down
16 changes: 1 addition & 15 deletions common-controller/internal/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package config

import (
"fmt"
"io/ioutil"
"os"
"reflect"
"regexp"
Expand All @@ -34,8 +33,6 @@ import (

var (
onceConfigRead sync.Once
apkHome string
logConfigPath string
controllerConfig *Config
envVariableMap map[string]string
)
Expand Down Expand Up @@ -66,7 +63,7 @@ func ReadConfigs() *Config {
if err != nil {
loggerConfig.ErrorC(logging.PrintError(logging.Error1000, logging.BLOCKER, "Configuration file not found, error: %v", err.Error()))
}
content, readErr := ioutil.ReadFile(pkgconf.GetApkHome() + relativeConfigPath)
content, readErr := os.ReadFile(pkgconf.GetApkHome() + relativeConfigPath)
if readErr != nil {
loggerConfig.ErrorC(logging.PrintError(logging.Error1001, logging.BLOCKER, "Error reading configurations, error: %v", readErr.Error()))
return
Expand Down Expand Up @@ -367,14 +364,3 @@ func resolveEnvFloat32Value(key string, value reflect.Value) {
value.SetFloat(resolvedValue)
}
}

func extractEnvironmentVars() {
envVariableArray := os.Environ()
for _, variable := range envVariableArray {
if strings.HasPrefix(strings.ToUpper(variable), envVariablePrefix) {
formattedVariable := strings.Split(variable, "=")[0]
envVariableValue := os.Getenv(formattedVariable)
envVariableMap[strings.ToUpper(formattedVariable)] = envVariableValue
}
}
}
13 changes: 7 additions & 6 deletions common-controller/internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ type commoncontroller struct {
ControlPlane controlplane
}
type controlplane struct {
Enabled bool
Host string
EventPort int
RestPort int
RetryInterval time.Duration
Persistence persistence
Enabled bool
Host string
EventPort int
RestPort int
RetryInterval time.Duration
Persistence persistence
SkipSSLVerification bool
}
type persistence struct {
Type string
Expand Down
4 changes: 0 additions & 4 deletions common-controller/internal/controlplane/artifact_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ type ArtifactDeployer interface {
DeleteSubscription(subscriptionID string) error
DeleteApplicationMappings(applicationID string) error
DeleteKeyMappings(keyMapping server.ApplicationKeyMapping) error
DeleteAllApplications() error
DeleteAllSubscriptions() error
DeleteAllApplicationMappings() error
DeleteAllKeyMappings() error
DeployAllApplications(applications server.ApplicationList) error
DeployAllSubscriptions(subscriptions server.SubscriptionList) error
DeployAllApplicationMappings(applicationMappings server.ApplicationMappingList) error
Expand Down
229 changes: 227 additions & 2 deletions common-controller/internal/controlplane/controlplane_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package controlplane

import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"reflect"
"strconv"
"time"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
Expand All @@ -31,8 +53,29 @@ type Agent struct {
}

var (
subsriptionList *SubscriptionList
applicationList *ApplicationList
appKeyMappingList *ApplicationKeyMappingList
appMappingList *ApplicationMappingList
connectionFaultChannel chan bool
eventStreamingClient apkmgt.EventStreamService_StreamEventsClient
resources = []resource{
{
endpoint: "/subscriptions",
responseType: subsriptionList,
},
{
endpoint: "/applications",
responseType: applicationList,
},
{
endpoint: "/applicationkeymappings",
responseType: appKeyMappingList,
},
{endpoint: "/applicationmappings",
responseType: appMappingList,
},
}
)

func init() {
Expand Down Expand Up @@ -130,7 +173,7 @@ func (controlPlaneGrpcClient *Agent) initializeGrpcStreaming() *grpc.ClientConn
func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) {
loggers.LoggerAPKOperator.Infof("Received event %s", event.Type)
if event.Type == constants.AllEvnts {
retrieveAllData()
go controlPlaneGrpcClient.retrieveAllData()
} else if event.Type == constants.ApplicationCreated {
loggers.LoggerAPKOperator.Infof("Received APPLICATION_CREATED event.")
if event.Application != nil {
Expand All @@ -139,6 +182,7 @@ func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) {
Owner: event.Application.Owner,
OrganizationID: event.Application.Organization,
Attributes: event.Application.Attributes,
TimeStamp: event.TimeStamp,
}
loggers.LoggerAPKOperator.Infof("Received Application %s", application.UUID)
controlPlaneGrpcClient.artifactDeployer.DeployApplication(application)
Expand Down Expand Up @@ -260,6 +304,187 @@ func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) {

}
}
func retrieveAllData() {
func (controlPlaneGrpcClient *Agent) retrieveAllData() {
var responseChannel = make(chan response)
config := config.ReadConfigs()
for _, url := range resources {
// Create a local copy of the loop variable
localURL := url

go InvokeService(localURL.endpoint, localURL.responseType, nil, responseChannel, 0)

for {
data := <-responseChannel
loggers.LoggerAPKOperator.Info("Receiving subscription data for an environment")
if data.Payload != nil {
loggers.LoggerAPKOperator.Info("Payload data information received" + string(data.Payload))
controlPlaneGrpcClient.retrieveDataFromResponseChannel(data)
break
} else if data.ErrorCode >= 400 && data.ErrorCode < 500 {
//Error handle
loggers.LoggerAPKOperator.Info("Error data information received")
//health.SetControlPlaneRestAPIStatus(false)
} else {
// Keep the iteration going on until a response is received.
// Error handle
go func(d response, endpoint string, responseType interface{}) {
// Retry fetching from control plane after a configured time interval
if config.CommonController.ControlPlane.RetryInterval == 0 {
// Assign default retry interval
config.CommonController.ControlPlane.RetryInterval = 5
}
loggers.LoggerAPKOperator.Debugf("Time Duration for retrying: %v", config.CommonController.ControlPlane.RetryInterval*time.Second)
time.Sleep(config.CommonController.ControlPlane.RetryInterval * time.Second)
loggers.LoggerAPKOperator.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", config.CommonController.ControlPlane.RetryInterval*time.Second)
go InvokeService(endpoint, responseType, nil, responseChannel, 0)
}(data, localURL.endpoint, localURL.responseType)
}
}
}
}

type resource struct {
endpoint string
responseType interface{}
}

type response struct {
Error error
Payload []byte
ErrorCode int
Endpoint string
Type interface{}
}

// InvokeService invokes the internal data resource
func InvokeService(endpoint string, responseType interface{}, queryParamMap map[string]string, c chan response,
retryAttempt int) {
config := config.ReadConfigs()
serviceURL := "https://" + config.CommonController.ControlPlane.Host + ":" + strconv.Itoa(config.CommonController.ControlPlane.RestPort) + endpoint
// Create the request
req, err := http.NewRequest("GET", serviceURL, nil)
if err != nil {
c <- response{err, nil, 0, endpoint, responseType}
loggers.LoggerAPKOperator.Errorf("Error occurred while creating an HTTP request for serviceURL: "+serviceURL, err)
return
}
q := req.URL.Query()
req.URL.RawQuery = q.Encode()

// Check if TLS is enabled
skipSSL := config.CommonController.ControlPlane.SkipSSLVerification
resp, err := InvokeControlPlane(req, skipSSL)

if err != nil {
if resp != nil {
c <- response{err, nil, resp.StatusCode, endpoint, responseType}
} else {
c <- response{err, nil, 0, endpoint, responseType}
}
loggers.LoggerAPKOperator.Infof("Error occurred while calling the REST API: "+serviceURL, err)
return
}

responseBytes, err := ioutil.ReadAll(resp.Body)
if resp.StatusCode == http.StatusOK {
if err != nil {
c <- response{err, nil, resp.StatusCode, endpoint, responseType}
loggers.LoggerAPKOperator.Infof("Error occurred while reading the response received for: "+serviceURL, err)
return
}
c <- response{nil, responseBytes, resp.StatusCode, endpoint, responseType}
} else {
c <- response{errors.New(string(responseBytes)), nil, resp.StatusCode, endpoint, responseType}
loggers.LoggerAPKOperator.Infof("Failed to fetch data! "+serviceURL+" responded with "+strconv.Itoa(resp.StatusCode),
err)
}
}

// InvokeControlPlane sends request to the control plane and returns the response
func InvokeControlPlane(req *http.Request, skipSSL bool) (*http.Response, error) {
tr := &http.Transport{}
if !skipSSL {
_, _, truststoreLocation := utils.GetKeyLocations()
caCertPool := utils.GetTrustedCertPool(truststoreLocation)
tr = &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: caCertPool},
}
} else {
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
}

// Configuring the http client
client := &http.Client{
Transport: tr,
}
return client.Do(req)
}
func (controlPlaneGrpcClient *Agent) retrieveDataFromResponseChannel(response response) {
responseType := reflect.TypeOf(response.Type).Elem()
newResponse := reflect.New(responseType).Interface()
err := json.Unmarshal(response.Payload, &newResponse)

if err != nil {
loggers.LoggerAPI.Infof("Error occurred while unmarshalling the response received for: "+response.Endpoint, err)
} else {
switch t := newResponse.(type) {
case *SubscriptionList:
loggers.LoggerAPI.Infof("Received Subscription information.")
subList := newResponse.(*SubscriptionList)
resolvedSubscriptionList := marshalMultipleSubscriptions(subList)
controlPlaneGrpcClient.artifactDeployer.DeployAllSubscriptions(resolvedSubscriptionList)

case *ApplicationList:
loggers.LoggerAPI.Infof("Received Application information.")
appList := newResponse.(*ApplicationList)
resolvedApplicationList := marshalMultipleApplications(appList)
controlPlaneGrpcClient.artifactDeployer.DeployAllApplications(resolvedApplicationList)
case *ApplicationKeyMappingList:
loggers.LoggerAPI.Infof("Received Application Key Mapping information.")
appKeyMappingList := newResponse.(*ApplicationKeyMappingList)
resolvedApplicationKeyMappingList := marshalMultipleApplicationKeyMappings(appKeyMappingList)
controlPlaneGrpcClient.artifactDeployer.DeployAllKeyMappings(resolvedApplicationKeyMappingList)
case *ApplicationMappingList:
loggers.LoggerAPI.Infof("Received Application Mapping information.")
appMappingList := newResponse.(*ApplicationMappingList)
resolvedApplicationMappingList := marshalMultipleApplicationMappings(appMappingList)
controlPlaneGrpcClient.artifactDeployer.DeployAllApplicationMappings(resolvedApplicationMappingList)
default:
loggers.LoggerAPI.Debugf("Unknown type %T", t)
}
}
}
func marshalMultipleSubscriptions(subList *SubscriptionList) server.SubscriptionList {
subscriptionList := server.SubscriptionList{List: []server.Subscription{}}
for _, subscription := range subList.List {
loggers.LoggerAPI.Debugf("Subscription: %v", subscription)
subscriptionList.List = append(subscriptionList.List, server.Subscription{UUID: subscription.UUID, Organization: subscription.Organization, SubStatus: subscription.SubStatus, SubscribedAPI: &server.SubscribedAPI{Name: subscription.SubscribedAPI.Name, Version: subscription.SubscribedAPI.Version}})
}
return subscriptionList
}
func marshalMultipleApplications(appList *ApplicationList) server.ApplicationList {
applicationList := server.ApplicationList{List: []server.Application{}}
for _, application := range appList.List {
loggers.LoggerAPI.Debugf("Application: %v", application)
applicationList.List = append(applicationList.List, server.Application{UUID: application.UUID, Name: application.Name, Owner: application.Owner, OrganizationID: application.Organization, Attributes: application.Attributes})
}
return applicationList
}
func marshalMultipleApplicationKeyMappings(appKeyMappingList *ApplicationKeyMappingList) server.ApplicationKeyMappingList {
applicationKeyMappingList := server.ApplicationKeyMappingList{List: []server.ApplicationKeyMapping{}}
for _, applicationKeyMapping := range appKeyMappingList.List {
loggers.LoggerAPI.Debugf("ApplicationKeyMapping: %v", applicationKeyMapping)
applicationKeyMappingList.List = append(applicationKeyMappingList.List, server.ApplicationKeyMapping{ApplicationUUID: applicationKeyMapping.ApplicationUUID, SecurityScheme: applicationKeyMapping.SecurityScheme, ApplicationIdentifier: applicationKeyMapping.ApplicationIdentifier, KeyType: applicationKeyMapping.KeyType, EnvID: applicationKeyMapping.EnvID, OrganizationID: applicationKeyMapping.Organization})
}
return applicationKeyMappingList
}
func marshalMultipleApplicationMappings(appMappingList *ApplicationMappingList) server.ApplicationMappingList {
applicationMappingList := server.ApplicationMappingList{List: []server.ApplicationMapping{}}
for _, applicationMapping := range appMappingList.List {
loggers.LoggerAPI.Debugf("ApplicationMapping: %v", applicationMapping)
applicationMappingList.List = append(applicationMappingList.List, server.ApplicationMapping{UUID: applicationMapping.UUID, ApplicationRef: applicationMapping.ApplicationRef, SubscriptionRef: applicationMapping.SubscriptionRef, OrganizationID: applicationMapping.Organization})
}
return applicationMappingList
}
Loading

0 comments on commit 651210d

Please sign in to comment.