Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
Thenujan-Nagaratnam committed May 2, 2024
1 parent dead66f commit e3b98ab
Showing 1 changed file with 30 additions and 27 deletions.
57 changes: 30 additions & 27 deletions import-export-cli/impl/uploadAPIs.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ func UploadAPIs(credential credentials.Credential, cmdUploadEnvironment string,
fmt.Println("Uploading public APIs to vector DB...")

// buffered channel with 10 slots
payloadQueue := make(chan []map[string]string, 10)
apiListQueue := make(chan []map[string]interface{}, 10)

// producer
go produceAPIPayloads(devPortalEndpoint, payloadQueue)
go produceAPIPayloads(devPortalEndpoint, apiListQueue)

// consumer
numConsumers := 2
numConsumers := 3
var wg sync.WaitGroup
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go consumeAPIPayloads(payloadQueue, &wg)
go consumeAPIPayloads(apiListQueue, &wg)
}

wg.Wait()
Expand All @@ -106,12 +106,12 @@ func InvokeGETRequest(requestURL, tenant string) (*resty.Response, error) {
return utils.InvokeGETRequest(requestURL, headers)
}

func produceAPIPayloads(devPortalEndpoint string, payloadQueue chan<- []map[string]string) {
processTenants(devPortalEndpoint, "tenants?state=active&limit=100&offset=0", payloadQueue)
close(payloadQueue)
func produceAPIPayloads(devPortalEndpoint string, apiListQueue chan<- []map[string]interface{}) {
processTenants(devPortalEndpoint, "tenants?state=active&limit=100&offset=0", apiListQueue)
close(apiListQueue)
}

func processTenants(devPortalEndpoint, endpointPath string, payloadQueue chan<- []map[string]string) {
// process all the tenants
func processTenants(devPortalEndpoint, endpointPath string, apiListQueue chan<- []map[string]interface{}) {
devPortalEndpoint = utils.AppendSlashToString(devPortalEndpoint)

requestURL := devPortalEndpoint + endpointPath
Expand All @@ -132,22 +132,23 @@ func processTenants(devPortalEndpoint, endpointPath string, payloadQueue chan<-
if tenantCount == 0 {
// Handle carbon.super tenant
fmt.Println("Processing tenant:", utils.DefaultTenantDomain)
processAPIs(devPortalEndpoint, utils.DefaultTenantDomain, "apis?limit=5&offset=0", payloadQueue)
processAPIs(devPortalEndpoint, utils.DefaultTenantDomain, "apis?limit=10&offset=0", apiListQueue)
} else {
// Handle all tenants
for _, tenant := range tenantListResponse.List {
fmt.Println("Processing tenant:", tenant.Domain)
processAPIs(devPortalEndpoint, tenant.Domain, "apis?limit=5&offset=0", payloadQueue)
processAPIs(devPortalEndpoint, tenant.Domain, "apis?limit=10&offset=0", apiListQueue)
}
}

// Process next set of tenants
if tenantListResponse.Pagination.Next != "" {
processTenants(devPortalEndpoint, tenantListResponse.Pagination.Next, payloadQueue)
processTenants(devPortalEndpoint, tenantListResponse.Pagination.Next, apiListQueue)
}
}

func processAPIs(devPortalEndpoint, tenant, endpointPath string, payloadQueue chan<- []map[string]string) {
// process apis in a tenant
func processAPIs(devPortalEndpoint, tenant, endpointPath string, apiListQueue chan<- []map[string]interface{}) {
requestURL := devPortalEndpoint + endpointPath

resp, err := InvokeGETRequest(requestURL, tenant)
Expand All @@ -164,10 +165,10 @@ func processAPIs(devPortalEndpoint, tenant, endpointPath string, payloadQueue ch
// Update totalAPIs count
atomic.AddInt32(&totalAPIs, apiListResponse.Count)

payload := []map[string]string{}
apiList := []map[string]interface{}{}

for _, api := range apiListResponse.List {
apiPayload := map[string]string{
apiPayload := map[string]interface{}{
"uuid": api.ID,
"description": api.Description,
"api_name": api.Name,
Expand Down Expand Up @@ -204,28 +205,30 @@ func processAPIs(devPortalEndpoint, tenant, endpointPath string, payloadQueue ch
utils.HandleErrorAndContinue("Error in getting async spec for API: "+api.ID, err)
}
}
payload = append(payload, apiPayload)
apiList = append(apiList, apiPayload)

}
payloadQueue <- payload
apiListQueue <- apiList

// Process next set of APIs
if apiListResponse.Pagination.Next != "" {
processAPIs(devPortalEndpoint, tenant, apiListResponse.Pagination.Next, payloadQueue)
processAPIs(devPortalEndpoint, tenant, apiListResponse.Pagination.Next, apiListQueue)
}
}

func consumeAPIPayloads(payloadQueue <-chan []map[string]string, wg *sync.WaitGroup) {
// get apiList from the queue and upload them
func consumeAPIPayloads(apiListQueue <-chan []map[string]interface{}, wg *sync.WaitGroup) {
defer wg.Done()

for payload := range payloadQueue {
InvokePOSTRequest(payload)
for apiList := range apiListQueue {
InvokePOSTRequest(apiList)
}
}

func InvokePOSTRequest(payload []map[string]string) {
fmt.Printf("Uploading %d APIs for tenant: %s\n", len(payload), payload[0]["tenant_domain"])
jsonData, err := json.Marshal(map[string]interface{}{"apis": payload})
// InvokePOSTRequest uploads the APIs to the vector DB
func InvokePOSTRequest(apiList []map[string]interface{}) {
fmt.Printf("Uploading %d APIs for tenant: %s\n", len(apiList), apiList[0]["tenant_domain"])
payload, err := json.Marshal(map[string]interface{}{"apis": apiList})
if err != nil {
utils.HandleErrorAndContinue("Error in marshalling payload:", err)
}
Expand All @@ -238,14 +241,14 @@ func InvokePOSTRequest(payload []map[string]string) {
var uploadErr error

for attempt := 1; attempt <= 2; attempt++ {
resp, uploadErr = utils.InvokePOSTRequest(endpoint+"/ai/spec-populator/bulk-upload", headers, jsonData)
resp, uploadErr = utils.InvokePOSTRequest(endpoint+"/ai/spec-populator/bulk-upload", headers, payload)
if uploadErr != nil {
fmt.Printf("API upload failed (attempt %d). Reason: %v\n", attempt, uploadErr)
continue
}

if resp.StatusCode() != 200 {
fmt.Printf("Failed to upload %d APIs for tenant %s with status %d %s (attempt %d).\n", len(payload), payload[0]["tenant_domain"], resp.StatusCode(), resp.Body(), attempt)
fmt.Printf("Failed to upload %d APIs for tenant %s with status %d %s (attempt %d).\n", len(apiList), apiList[0]["tenant_domain"], resp.StatusCode(), resp.Body(), attempt)
continue
}

Expand All @@ -258,7 +261,7 @@ func InvokePOSTRequest(payload []map[string]string) {
continue
}

fmt.Printf("%d APIs uploaded successfully for tenant: %s (attempt %d)\n", len(payload), payload[0]["tenant_domain"], attempt)
fmt.Printf("%d APIs uploaded successfully for tenant: %s (attempt %d)\n", len(apiList), apiList[0]["tenant_domain"], attempt)
atomic.AddInt32(&uploadedAPIs, jsonResp["message"]["upsert_count"])
break
}
Expand Down

0 comments on commit e3b98ab

Please sign in to comment.