Skip to content

Commit

Permalink
Avoiding rate limits for internal services (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
vthwang authored Dec 17, 2024
1 parent c00c9a1 commit 3b701b7
Show file tree
Hide file tree
Showing 37 changed files with 348 additions and 303 deletions.
16 changes: 14 additions & 2 deletions cmd/dataproxy/dataproxy/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package main

import "github.com/MurmurationsNetwork/MurmurationsServices/services/dataproxy/pkg/dataproxy"
import (
"github.com/MurmurationsNetwork/MurmurationsServices/pkg/logger"
"github.com/MurmurationsNetwork/MurmurationsServices/services/dataproxy/pkg/dataproxy"
)

func main() {
dataproxy.StartApplication()
logger.Info("Dataproxy service starting")

s := dataproxy.NewService()

go func() {
<-s.WaitUntilUp()
logger.Info("Dataproxy service started")
}()

s.Run()
}
15 changes: 7 additions & 8 deletions cmd/dataproxy/seeder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@ const (
)

func Init() {
config.Init()
mongoInit()
}

func mongoInit() {
uri := mongo.GetURI(
config.Conf.Mongo.USERNAME,
config.Conf.Mongo.PASSWORD,
config.Conf.Mongo.HOST,
config.Values.Mongo.USERNAME,
config.Values.Mongo.PASSWORD,
config.Values.Mongo.HOST,
)

err := mongo.NewClient(uri, config.Conf.Mongo.DBName)
err := mongo.NewClient(uri, config.Values.Mongo.DBName)
if err != nil {
fmt.Println("Error when trying to connect to MongoDB.", err)
os.Exit(1)
Expand Down Expand Up @@ -164,7 +163,7 @@ func importData(
}

// Validate data
validateURL := config.Conf.Index.URL + "/v2/validate"
validateURL := config.Values.Index.URL + "/v2/validate"
isValid, failureReasons, err := importutil.Validate(
validateURL,
profileJSON,
Expand Down Expand Up @@ -193,8 +192,8 @@ func importData(
}

// Post to index service
postNodeURL := config.Conf.Index.URL + "/v2/nodes"
profileURL := config.Conf.DataProxy.URL + "/v1/profiles/" + profileJSON["cuid"].(string)
postNodeURL := config.Values.Index.URL + "/v2/nodes"
profileURL := config.Values.DataProxy.URL + "/v1/profiles/" + profileJSON["cuid"].(string)
nodeID, err := importutil.PostIndex(postNodeURL, profileURL)
if err != nil {
return false, fmt.Errorf(
Expand Down
15 changes: 7 additions & 8 deletions cmd/dataproxy/ukseeder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ const (
)

func Init() {
config.Init()
mongoInit()
}

func mongoInit() {
uri := mongo.GetURI(
config.Conf.Mongo.USERNAME,
config.Conf.Mongo.PASSWORD,
config.Conf.Mongo.HOST,
config.Values.Mongo.USERNAME,
config.Values.Mongo.PASSWORD,
config.Values.Mongo.HOST,
)

err := mongo.NewClient(uri, config.Conf.Mongo.DBName)
err := mongo.NewClient(uri, config.Values.Mongo.DBName)
if err != nil {
fmt.Println("Error when trying to connect to MongoDB.", err)
os.Exit(1)
Expand Down Expand Up @@ -214,7 +213,7 @@ func importData(row int, file *excelize.File) (bool, error) {
profile["metadata"] = metadata

// Validate data
validateURL := config.Conf.Index.URL + "/v2/validate"
validateURL := config.Values.Index.URL + "/v2/validate"
isValid, failureReasons, err := importutil.Validate(validateURL, profile)
if err != nil {
return false, fmt.Errorf(
Expand All @@ -240,8 +239,8 @@ func importData(row int, file *excelize.File) (bool, error) {
}

// Post to index service
postNodeURL := config.Conf.Index.URL + "/v2/nodes"
profileURL := config.Conf.DataProxy.URL + "/v1/profiles/" + profile["cuid"].(string)
postNodeURL := config.Values.Index.URL + "/v2/nodes"
profileURL := config.Values.DataProxy.URL + "/v1/profiles/" + profile["cuid"].(string)
nodeID, err := importutil.PostIndex(postNodeURL, profileURL)
if err != nil {
return false, fmt.Errorf(
Expand Down
6 changes: 6 additions & 0 deletions pkg/middleware/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func NewRateLimitWithOptions(options RateLimitOptions) gin.HandlerFunc {
options.Method == defaultMethod {
ip := c.ClientIP()

// If IP starts with "10", bypass rate limiting
if len(ip) >= 3 && ip[:3] == "10." {
c.Next()
return
}

context, err := ipRateLimiter.Get(c, ip)
if err != nil {
logger.Error(
Expand Down
12 changes: 1 addition & 11 deletions services/dataproxy/config/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package config

import (
"log"
"time"

env "github.com/caarlos0/env/v10"
)

var Conf = config{}
var Values = config{}

type config struct {
Server serverConf
Expand Down Expand Up @@ -44,10 +41,3 @@ type mongoConf struct {
HOST string `env:"MONGO_HOST,required"`
DBName string `env:"MONGO_DB_NAME,required"`
}

func Init() {
err := env.Parse(&Conf)
if err != nil {
log.Fatalf("Failed to decode environment variables: %s", err)
}
}
40 changes: 0 additions & 40 deletions services/dataproxy/global/init.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http
package rest

import (
"encoding/csv"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http
package rest

import (
"encoding/json"
Expand Down Expand Up @@ -47,7 +47,7 @@ func (handler *mappingsHandler) Create(c *gin.Context) {
// download mapping from cdn
url := fmt.Sprintf(
"%s/v2/schemas/%s",
config.Conf.Library.InternalURL,
config.Values.Library.InternalURL,
schema,
)
//var schemas map[string]interface{}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http
package rest

import (
"net/http"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http
package rest

import (
"net/http"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http
package rest

import (
"net/http"
Expand Down
12 changes: 6 additions & 6 deletions services/dataproxy/internal/service/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ func (s *batchService) Import(
}

// Post the profile to the index service.
postNodeURL := config.Conf.Index.URL + "/v2/nodes"
profileURL := config.Conf.DataProxy.URL + "/v1/profiles/" + profileCUID
postNodeURL := config.Values.Index.URL + "/v2/nodes"
profileURL := config.Values.DataProxy.URL + "/v1/profiles/" + profileCUID
nodeID, err := importutil.PostIndex(postNodeURL, profileURL)
if err != nil {
return batchID, -1, nil, fmt.Errorf(
Expand Down Expand Up @@ -495,8 +495,8 @@ func (s *batchService) Edit(
}

// Import profile to Index
postNodeURL := config.Conf.Index.URL + "/v2/nodes"
profileURL := config.Conf.DataProxy.URL + "/v1/profiles/" + profileCuid
postNodeURL := config.Values.Index.URL + "/v2/nodes"
profileURL := config.Values.DataProxy.URL + "/v1/profiles/" + profileCuid
nodeID, err := importutil.PostIndex(postNodeURL, profileURL)
if err != nil {
return line, nil, errors.New(
Expand Down Expand Up @@ -533,7 +533,7 @@ func (s *batchService) Edit(
// Delete profiles from Index
if profile["is_posted"].(bool) {
nodeID := profile["node_id"].(string)
deleteNodeURL := config.Conf.Index.URL + "/v2/nodes/" + nodeID
deleteNodeURL := config.Values.Index.URL + "/v2/nodes/" + nodeID
err := importutil.DeleteIndex(deleteNodeURL, nodeID)
if err != nil {
return -1, nil, errors.New(
Expand Down Expand Up @@ -573,7 +573,7 @@ func (s *batchService) Delete(userID string, batchID string) error {
for _, profile := range profiles {
if profile["is_posted"].(bool) {
nodeID := profile["node_id"].(string)
deleteNodeURL := config.Conf.Index.URL + "/v2/nodes/" + nodeID
deleteNodeURL := config.Values.Index.URL + "/v2/nodes/" + nodeID
err := importutil.DeleteIndex(deleteNodeURL, nodeID)
if err != nil {
return errors.New(
Expand Down
2 changes: 1 addition & 1 deletion services/dataproxy/internal/service/parse_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ParseSchemas(schemas []string) (*SchemasResponse, error) {
// Include default schema and append provided schemas.
schemaNames := append([]string{DefaultSchema}, schemas...)
jsonSchemas := make([]string, len(schemaNames))
baseURL := fmt.Sprintf("%s/v2/schemas", config.Conf.Library.InternalURL)
baseURL := fmt.Sprintf("%s/v2/schemas", config.Values.Library.InternalURL)

for i, schema := range schemaNames {
schemaURL := fmt.Sprintf("%s/%s", baseURL, schema)
Expand Down
2 changes: 1 addition & 1 deletion services/dataproxy/internal/service/parse_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestParseSchemas(t *testing.T) {
defer mockServer.Close()

// Mock the config to use the test server's URL.
config.Conf.Library.InternalURL = mockServer.URL
config.Values.Library.InternalURL = mockServer.URL

// Define test cases.
tests := []struct {
Expand Down
49 changes: 0 additions & 49 deletions services/dataproxy/pkg/dataproxy/application.go

This file was deleted.

4 changes: 0 additions & 4 deletions services/dataproxy/pkg/dataproxy/cleanup.go

This file was deleted.

16 changes: 16 additions & 0 deletions services/dataproxy/pkg/dataproxy/envvar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package dataproxy

import (
"log"

env "github.com/caarlos0/env/v10"

"github.com/MurmurationsNetwork/MurmurationsServices/services/dataproxy/config"
)

func init() {
err := env.Parse(&config.Values)
if err != nil {
log.Fatalf("Failed to decode environment variables: %s", err)
}
}
32 changes: 0 additions & 32 deletions services/dataproxy/pkg/dataproxy/graceful_shutdown.go

This file was deleted.

Loading

0 comments on commit 3b701b7

Please sign in to comment.