Skip to content

Commit

Permalink
Merge pull request #49 from igh9410/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
igh9410 authored May 7, 2024
2 parents 7ffa43d + cd1ab89 commit cd59820
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
./database/*
.env
.env.*
k8s
./k8s/configmaps/*
22 changes: 22 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Makefile`

# Run the application
run:
@echo "Running the Kubernetes cluster..."
@kubectl apply -f ./k8s/services/
@kubectl apply -f ./k8s/deployments/
@kubectl apply -f ./k8s/configmaps/

shutdown:
@echo "Shutting down the Kubernetes cluster..."

@kubectl scale --replicas=0 deployment blabber-hive
@kubectl scale --replicas=0 deployment broker
@kubectl scale --replicas=0 deployment fastapi
@kubectl scale --replicas=0 deployment grafana
@kubectl scale --replicas=0 deployment kafka-setup
@kubectl scale --replicas=0 deployment nginx
@kubectl scale --replicas=0 deployment postgres
@kubectl scale --replicas=0 deployment prometheus
@kubectl scale --replicas=0 deployment redis
@kubectl scale --replicas=0 deployment zookeeper
2 changes: 1 addition & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ docker-run:

# Run golangci-lint
linter:
golangci-lint run
@golangci-lint run

# Create database migration file
create-migration:
Expand Down
30 changes: 21 additions & 9 deletions backend/cmd/blabber-hive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,29 @@ func main() {
log.Printf("Error closing the Redis connection: %s", err)
}
}()
/*
kafkaClient, err := kafka.NewKafkaClient()
if err != nil {
log.Printf("Failed to initialize Kafka cluster connection")
}
defer kafkaClient.Close()
kafkaClient, err := kafka.NewKafkaClient()
if err != nil {
log.Printf("Failed to initialize Kafka cluster connection")
}
defer kafkaClient.Close()
kafkaProducer, err := kafka.KafkaProducer()
if err != nil {
log.Printf("Failed to initialize Kafka producer")
}
defer kafkaProducer.Close() */

kafkaProducer, err := kafka.KafkaProducer()
if err != nil {
log.Printf("Failed to initialize Kafka producer")
}
// Start the Kafka reconnection loop
stopReconnect := make(chan struct{})
kafkaConnected, kafkaProducer := kafka.ReconnectLoop(stopReconnect)

// Wait for successful Kafka connection
<-kafkaConnected

// Use the kafkaProducer instance
defer kafkaProducer.Close()
// ... (rest of the code that uses kafkaProducer)

userRep := user.NewRepository(dbConn.GetDB())
userSvc := user.NewService(userRep)
Expand Down Expand Up @@ -112,6 +123,7 @@ func main() {
MatchHandler: matchHandler,
// Future handlers can be added here without changing the InitRouter signature
}
close(stopReconnect)

router.InitRouter(routerConfig)
cancel()
Expand Down
6 changes: 5 additions & 1 deletion backend/infra/kafka/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func NewInsertFunc(db *sql.DB) func([]chat.Message) error {
strings.Join(valueStrings, ","))
_, err = tx.Exec(stmt, valueArgs...)
if err != nil {
tx.Rollback() // Rollback in case of error
if rollbackErr := tx.Rollback(); rollbackErr != nil {
// Log or handle the rollback error appropriately
log.Printf("Error rolling back transaction: %s", rollbackErr)
return rollbackErr
}
return err
}

Expand Down
54 changes: 54 additions & 0 deletions backend/infra/kafka/reconnect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package kafka

import (
"log"
"time"

confluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// ReconnectLoop continuously attempts to establish a Kafka connection
// and sends a signal on the returned channel when a connection is made.
// The loop will continue to run until the provided stop channel is closed.
// It also returns the Kafka producer instance.
func ReconnectLoop(stop chan struct{}) (chan struct{}, *confluentKafka.Producer) {
connected := make(chan struct{})
var producer *confluentKafka.Producer

go func() {
for {
select {
case <-stop:
log.Println("Stopping Kafka reconnection loop")
close(connected)
if producer != nil {
producer.Close()
}
return
default:
kafkaClient, err := NewKafkaClient()
if err != nil {
log.Printf("Failed to initialize Kafka cluster connection: %s", err)
time.Sleep(30 * time.Second) // Wait for 30 seconds before retrying
continue
}
defer kafkaClient.Close()

producer, err = KafkaProducer()
if err != nil {
log.Printf("Failed to initialize Kafka producer: %s", err)
time.Sleep(30 * time.Second) // Wait for 30 seconds before retrying
continue
}

log.Println("Kafka connection established")
connected <- struct{}{}

// Wait for the connection to be closed (e.g., due to an error)
<-connected
}
}
}()

return connected, producer
}
2 changes: 1 addition & 1 deletion backend/infra/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewRedisClient() (*RedisClient, error) {
// Parse the Redis URL
opt, err := redis.ParseURL(redisURL)
if err != nil {
slog.Error("Failed to parse Redis URL: %s\n", err)
slog.Error("Failed to parse Redis URL: %s\n", err.Error(), " in NewRedisClient")
}

client := redis.NewClient(opt)
Expand Down
33 changes: 10 additions & 23 deletions backend/internal/chat/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewRepository(db DBTX) Repository {
func (r *repository) CreateChatRoom(ctx context.Context, chatRoom *ChatRoom) (*ChatRoom, error) {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
slog.Error("Creating Chatroom transaction failed")
slog.Error("Creating Chatroom transaction failed: ", err.Error(), " in CreateChatRoom")
return nil, err // handle error appropriately
}
// Generate a UUID for user ID
Expand All @@ -51,22 +51,9 @@ func (r *repository) CreateChatRoom(ctx context.Context, chatRoom *ChatRoom) (*C
// Set the current timestamp for CreatedAt
chatRoom.CreatedAt = time.Now()

query := "INSERT INTO chat_rooms(id, name, created_at) VALUES ($1, $2, $3) RETURNING id"

err = r.db.QueryRowContext(ctx, query, chatRoom.ID, chatRoom.Name, chatRoom.CreatedAt).Scan(&chatRoom.ID)
if err != nil {
log.Printf("Error creating chat room: %v", err)
if rbErr := tx.Rollback(); rbErr != nil {
slog.Error("Transaction rollback failed: %v", rbErr)
}

return nil, errors.New("failed to create chat room")
}

// Commit the transaction on success
if err = tx.Commit(); err != nil {
slog.Error("Transaction commit failed: ", err)
return nil, err
if rbErr := tx.Rollback(); rbErr != nil {
slog.Error("Transaction rollback failed: %v", rbErr.Error(), " in CreateChatRoom")
return nil, rbErr
}

return chatRoom, nil
Expand Down Expand Up @@ -113,7 +100,7 @@ func (r *repository) FindChatRoomInfoByID(ctx context.Context, chatRoomID uuid.U
var userInChatRoom UserInChatRoom
err := rows.Scan(&userInChatRoom.ID, &userInChatRoom.UserID, &userInChatRoom.ChatRoomID, &createdAt)
if err != nil {
log.Printf("Failed to scan chat room info, err: %v", err)
log.Printf("Failed to scan chat room info, err: %v", err.Error())
return nil, err
}
usersInChatRoom = append(usersInChatRoom, userInChatRoom)
Expand All @@ -137,7 +124,7 @@ func (r *repository) JoinChatRoomByID(ctx context.Context, chatRoomID uuid.UUID,
defer func() {
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
slog.Error("Transaction rollback failed: %v", rbErr)
slog.Error("Transaction rollback failed: ", rbErr.Error(), " in JoinChatRoomByID")
}
}
}()
Expand All @@ -146,13 +133,13 @@ func (r *repository) JoinChatRoomByID(ctx context.Context, chatRoomID uuid.UUID,
query := `INSERT INTO users_in_chat_rooms (user_id, chat_room_id) VALUES ($1, $2)`
_, err = tx.ExecContext(ctx, query, userID, chatRoomID)
if err != nil {
slog.Error("Error joining chat room, db execcontext: ", err)
slog.Error("Error joining chat room, db execcontext: ", err.Error(), " in JoinChatRoomByID")
return nil, err
}

// Commit transaction
if err = tx.Commit(); err != nil {
slog.Error("Transaction commit failed: ", err)
slog.Error("Transaction commit failed: ", err.Error(), " in JoinChatRoomByID")
return nil, err
}
chatRoom := &ChatRoom{
Expand All @@ -168,7 +155,7 @@ func (r *repository) FindChatRoomList(ctx context.Context) ([]*ChatRoom, error)

rows, err := r.db.QueryContext(ctx, query)
if err != nil {
slog.Error("Error occured with finding chat room list: ", err)
slog.Error("Error occured with finding chat room list: ", err.Error(), " in FindChatRoomList")
return nil, err
}
defer rows.Close()
Expand All @@ -181,7 +168,7 @@ func (r *repository) FindChatRoomList(ctx context.Context) ([]*ChatRoom, error)

err := rows.Scan(&chatRoom.ID, &name, &chatRoom.CreatedAt)
if err != nil {
slog.Error("Error occurred while scanning chat room list: ", err)
slog.Error("Error occurred while scanning chat room list: ", err.Error(), " in FindChatRoomList")
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion backend/internal/chat/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *service) GetChatRoomInfoByID(ctx context.Context, chatRoomID uuid.UUID)
func (s *service) GetChatRoomList(ctx context.Context) ([]*ChatRoom, error) {
chatRoomList, err := s.Repository.FindChatRoomList(ctx)
if err != nil {
slog.Error("Error occured with finding chat room list: ", err)
slog.Error("Error occured with finding chat room list: ", err.Error(), "in service.GetChatRoomList")
return nil, err
}
return chatRoomList, nil
Expand Down
8 changes: 4 additions & 4 deletions backend/internal/match/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *repository) EnqueueUser(ctx context.Context, userID uuid.UUID) (*EnqueU
added, err := r.redisClient.SAdd(ctx, "match_queue", userID.String()).Result()

if err != nil {
slog.Error("Error adding user to the matchmaking queue:", err)
slog.Error("Error adding user to the matchmaking queue:", err.Error(), " in EnqueueUser")
return nil, err
}
if added == 0 { // User already in the queue
Expand All @@ -50,7 +50,7 @@ func (r *repository) DequeueUser(ctx context.Context, userID string) error {
// Use SREM command
_, err := r.redisClient.SRem(ctx, "match_queue", userID).Result()
if err != nil {
slog.Error("Error removing user from the matchmaking queue:", err)
slog.Error("Error removing user from the matchmaking queue:", err.Error(), " in DequeueUser")
return err
}
slog.Info(fmt.Sprintf("User with ID %s removed from the matchmaking queue", userID))
Expand All @@ -62,7 +62,7 @@ func (r *repository) FetchCandidates(ctx context.Context) ([]string, error) {
// For example, if you're using a Redis SET, you might use SMEMBERS to get all members
candidates, err := r.redisClient.SMembers(ctx, "match_queue").Result()
if err != nil {
slog.Error("Error fetching candidates from the matchmaking queue:", err)
slog.Error("Error fetching candidates from the matchmaking queue:", err.Error(), " in FetchCandidates")
return nil, err
}

Expand All @@ -76,7 +76,7 @@ func (r *repository) DequeueUsers(ctx context.Context, userIDs ...string) error

_, err := r.redisClient.SRem(ctx, "match_queue", userIDs).Result()
if err != nil {
slog.Error("Error removing users from the matchmaking queue:", err)
slog.Error("Error removing users from the matchmaking queue:", err.Error(), " in DequeueUsers")
return err
}

Expand Down
10 changes: 5 additions & 5 deletions backend/internal/match/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func (s *service) EnqueueUser(c context.Context, userID uuid.UUID) (*EnqueUserRe
r, err := s.Repository.EnqueueUser(ctx, userID)

if err != nil {
slog.Error("Error enqueuing user:", err)
slog.Error("Error enqueuing user:", err.Error(), " in EnqueueUser")
return nil, err
}

// Publish an event to the Redis channel
err = s.redisClient.Publish(ctx, "matchmaking_channel", userID.String()).Err()
if err != nil {
slog.Error("Error publishing to matchmaking channel:", err)
slog.Error("Error publishing to matchmaking channel:", err.Error(), " in EnqueueUser")
return nil, err
}
slog.Info("User enqueued successfully")
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *service) performMatchmaking(userID string) error {
// Fetch potential match candidates
candidates, err := s.Repository.FetchCandidates(context.Background())
if err != nil {
slog.Error("Error fetching match candidates:", err)
slog.Error("Error fetching match candidates:", err.Error(), " in performMatchmaking")
return err
}

Expand All @@ -119,7 +119,7 @@ func (s *service) performMatchmaking(userID string) error {
// Handle the match
err = s.handleMatch(userID, matchID)
if err != nil {
slog.Error("Error handling match:", err)
slog.Error("Error handling match:", err.Error(), " in performMatchmaking")
return err
}
// Rest of the matchmaking logic...
Expand All @@ -135,7 +135,7 @@ func (s *service) handleMatch(userID, matchID string) error {
err := s.Repository.DequeueUsers(ctx, removedIDs...)
// Code goes here
if err != nil {
slog.Error("Error dequeuing users:", err)
slog.Error("Error dequeuing users:", err.Error(), " in handleMatch")
return err
}
chatRoom := &chat.ChatRoom{}
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/user/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *service) CreateUser(c context.Context, req *CreateUserReq, userID uuid.

r, err := s.Repository.CreateUser(ctx, u)
if err != nil {
slog.Error("Error creating user:", err)
slog.Error("Error creating user:", err.Error(), " in CreateUser")
return nil, err
}

Expand Down
18 changes: 15 additions & 3 deletions nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ events {
}

http {
upstream chatbackend {
upstream blabber-hive {
server blabber-hive:8080;
}

Expand All @@ -18,7 +18,7 @@ http {
server_name localhost;

location /api/ {
proxy_pass http://chatbackend;
proxy_pass http://blabber-hive;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
Expand All @@ -31,10 +31,22 @@ http {
# Custom location block to handle 401 and 403 responses
location @handle_errors {
# Forward the original response from the upstream server
proxy_pass http://chatbackend;
proxy_pass http://blabber-hive;
proxy_intercept_errors off;
}

# WebSocket location block
location /ws/ {
proxy_pass http://blabber-hive/ws/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
#proxy_set_header X-Real-IP $remote_addr;
#proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

location /ml/api/ {
proxy_pass http://fastapi/api/;
proxy_set_header Host $host;
Expand Down
Empty file added terraform/frontend.tf
Empty file.

0 comments on commit cd59820

Please sign in to comment.