From ef836b6afa115f4000e6eb86a30ee80ee1fcb8a0 Mon Sep 17 00:00:00 2001 From: perkzen Date: Wed, 13 Dec 2023 11:53:37 +0100 Subject: [PATCH] feat: add rabbit logging queue --- cmd/server/main.go | 11 ++-- go.mod | 1 + go.sum | 3 + internal/database/mongo.go | 6 +- internal/env/vars.go | 5 +- internal/middlewares/logger.go | 100 +++++++++++++++++++++++++-------- internal/rabbitmq/rabbitmq.go | 51 +++++++++++++++++ 7 files changed, 145 insertions(+), 32 deletions(-) create mode 100644 internal/rabbitmq/rabbitmq.go diff --git a/cmd/server/main.go b/cmd/server/main.go index a6b8054..57c18da 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -7,6 +7,7 @@ import ( "github.com/joho/godotenv" "github.com/mycandys/orders/internal/database" "github.com/mycandys/orders/internal/env" + "github.com/mycandys/orders/internal/rabbitmq" "github.com/mycandys/orders/internal/routes" "github.com/mycandys/orders/internal/swagger" "log" @@ -28,14 +29,12 @@ func main() { panic(err) } - uri, err := env.GetEnvVar(env.DATABASE_URL) - if err != nil { - panic(err) - } - - db := database.Connect(uri) + db := database.Connect() defer database.Disconnect(db, context.Background()) + amqp := rabbitmq.Connect() + defer rabbitmq.Close(amqp) + app := routes.InitRouter() swagger.InitInfo() diff --git a/go.mod b/go.mod index 7b57914..4decd2b 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rabbitmq/amqp091-go v1.9.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/testify v1.8.4 // indirect diff --git a/go.sum b/go.sum index f7a28ab..4ba6ee7 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,8 @@ github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6 github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -132,6 +134,7 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mongodb.org/mongo-driver v1.13.0 h1:67DgFFjYOCMWdtTEmKFpV3ffWlFnh+CYZ8ZS/tXWUfY= go.mongodb.org/mongo-driver v1.13.0/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.6.0 h1:S0JTfE48HbRj80+4tbvZDYsJ3tGv6BUU3XxyZ7CirAc= golang.org/x/arch v0.6.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= diff --git a/internal/database/mongo.go b/internal/database/mongo.go index bbdfb8a..c7e63e1 100644 --- a/internal/database/mongo.go +++ b/internal/database/mongo.go @@ -10,7 +10,11 @@ import ( var Db *mongo.Database -func Connect(uri string) *mongo.Client { +func Connect() *mongo.Client { + uri, err := env.GetEnvVar(env.DATABASE_URL) + if err != nil { + panic(err) + } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() diff --git a/internal/env/vars.go b/internal/env/vars.go index 44a2207..35d6983 100644 --- a/internal/env/vars.go +++ b/internal/env/vars.go @@ -7,5 +7,8 @@ const ( CART_SERVICE_URL = "CART_SERVICE_URL" NOTIFICATIONS_SERVICE_URL = "NOTIFICATIONS_SERVICE_URL" AUTH_SERVICE_URL = "AUTH_SERVICE_URL" - SWAGGER_URI = "SWAGGER_URI" + SWAGGER_URI = "SWAGGER_URI" + RABBITMQ_URL = "RABBITMQ_URL" + EXCHANGE_NAME = "EXCHANGE_NAME" + QUEUE_NAME = "QUEUE_NAME" ) diff --git a/internal/middlewares/logger.go b/internal/middlewares/logger.go index 38bbd4c..0958d58 100644 --- a/internal/middlewares/logger.go +++ b/internal/middlewares/logger.go @@ -2,13 +2,64 @@ package middlewares import ( "bytes" + "encoding/json" + "fmt" "github.com/gin-gonic/gin" "github.com/google/uuid" + "github.com/mycandys/orders/internal/env" + "github.com/mycandys/orders/internal/rabbitmq" "github.com/sirupsen/logrus" + "log" "net/http" "time" ) +type Log struct { + Timestamp string `json:"timestamp"` + CorrelationId string `json:"correlationId"` + Url string `json:"url"` + Message string `json:"message"` + Service string `json:"service"` + Type string `json:"type"` +} + +func NewLog(correlationId string, url string, message string, service string) *Log { + return &Log{ + Timestamp: time.Now().Format(time.RFC3339), + CorrelationId: correlationId, + Url: url, + Message: message, + Service: service, + } +} + +func (l *Log) setType(logType string) { + l.Type = logType +} + +func (m *Middleware) LogFields(log *Log) *logrus.Entry { + return m.logger.WithFields(logrus.Fields{ + "timestamp": log.Timestamp, + "type": log.Type, + "correlationId": log.CorrelationId, + "url": log.Url, + "message": log.Message, + "service": log.Service, + }) +} + +func (m *Middleware) LogError(log *Log) { + m.LogFields(log).Error("Request received") +} + +func (m *Middleware) LogWarning(log *Log) { + m.LogFields(log).Warn("Request received") +} + +func (m *Middleware) LogInfo(log *Log) { + m.LogFields(log).Info("Request received") +} + type bodyLogWriter struct { gin.ResponseWriter body *bytes.Buffer @@ -30,40 +81,41 @@ func (m *Middleware) Logger() gin.HandlerFunc { c.Header("X-Correlation-Id", correlationId) - timestamp := time.Now().Format(time.RFC3339) - - fullUrl := c.Request.Host + c.Request.URL.RequestURI() + url := c.Request.Host + c.Request.URL.RequestURI() blw := &bodyLogWriter{body: bytes.NewBufferString(""), ResponseWriter: c.Writer} c.Writer = blw c.Next() status := c.Writer.Status() + + method := c.Request.Method + + msg := fmt.Sprintf("%s - %s ", method, c.Request.URL.Path) + + serviceLog := NewLog(correlationId, url, msg, "orders") + switch { case status >= http.StatusInternalServerError: - m.logger.WithFields(logrus.Fields{ - "timestamp": timestamp, - "logType": "Error", - "url": fullUrl, - "correlationId": correlationId, - "service": "orders", - }).Error("Request received") + serviceLog.setType("error") + m.LogError(serviceLog) case status >= http.StatusBadRequest && status < http.StatusInternalServerError: - m.logger.WithFields(logrus.Fields{ - "timestamp": timestamp, - "logType": "Warning", - "url": fullUrl, - "correlationId": correlationId, - "service": "orders", - }).Warning("Request received") + serviceLog.setType("warning") + m.LogWarning(serviceLog) default: - m.logger.WithFields(logrus.Fields{ - "timestamp": timestamp, - "logType": "Info", - "url": fullUrl, - "correlationId": correlationId, - "service": "orders", - }).Info("Request received") + serviceLog.setType("info") + m.LogInfo(serviceLog) + } + + queueName, _ := env.GetEnvVar(env.QUEUE_NAME) + exchangeName, _ := env.GetEnvVar(env.EXCHANGE_NAME) + + queue := rabbitmq.DeclareQueue(queueName) + + payload, err := json.Marshal(serviceLog) + if err != nil { + log.Fatal(err) } + rabbitmq.Publish(exchangeName, queue.Name, payload, c.Request.Context()) } } diff --git a/internal/rabbitmq/rabbitmq.go b/internal/rabbitmq/rabbitmq.go new file mode 100644 index 0000000..2320488 --- /dev/null +++ b/internal/rabbitmq/rabbitmq.go @@ -0,0 +1,51 @@ +package rabbitmq + +import ( + "context" + "github.com/mycandys/orders/internal/env" + amqp "github.com/rabbitmq/amqp091-go" + "log" +) + +var Ch *amqp.Channel + +func Connect() *amqp.Connection { + url, _ := env.GetEnvVar(env.RABBITMQ_URL) + conn, err := amqp.Dial(url) + if err != nil { + log.Fatalf("Error connecting to RabbitMQ: %v", err) + + } + + Ch, err = conn.Channel() + if err != nil { + log.Fatalf("Error opening RabbitMQ channel: %v", err) + } + + return conn +} + +func Close(conn *amqp.Connection) { + if err := conn.Close(); err != nil { + log.Fatalf("Error closing RabbitMQ connection: %v", err) + } +} + +func DeclareQueue(name string) amqp.Queue { + q, err := Ch.QueueDeclare(name, true, false, false, false, nil) + if err != nil { + log.Fatalf("Error declaring RabbitMQ queue: %v", err) + } + return q +} + +func Publish(exchangeName string, queueName string, body []byte, ctx context.Context) { + err := Ch.PublishWithContext(ctx, exchangeName, queueName, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: body, + }) + if err != nil { + log.Fatalf("Error publishing message to RabbitMQ: %v", err) + } + +}