Skip to content

Commit

Permalink
Multiroom communication
Browse files Browse the repository at this point in the history
Each client has their own list of chat rooms. Messages are sent to specific chat rooms and only users connected to them receive the messages
  • Loading branch information
0gap committed Dec 11, 2021
1 parent 15d6aed commit a92d093
Showing 1 changed file with 43 additions and 25 deletions.
68 changes: 43 additions & 25 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ import (
"time"
)

type Connection struct {
stream lpb.Broadcast_CreateStreamServer
id string
active bool
error chan error
}

type Server struct {
lpb.UnimplementedBroadcastServer
Connections map[string]*Connection
}

type ChatRoom struct {
id [32]byte
name string
Expand All @@ -26,11 +38,13 @@ type ChatUser struct {
}

var (
grpcLog glog.LoggerV2
serverPort = flag.Int("p", 50051, "The server port")
chatRooms map[string]ChatRoom
users map[string]ChatUser
users2ChatRooms map[string][]string
grpcLog glog.LoggerV2
serverPort = flag.Int("p", 50051, "The server port")
chatRooms map[string]ChatRoom
users map[string]ChatUser
users2ChatRooms map[string][]string
users2connections map[string]*Connection
chat2Connections map[string]map[string]*Connection
)

func init() {
Expand All @@ -39,18 +53,8 @@ func init() {
users2ChatRooms = make(map[string][]string)
grpcLog = glog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout)
chatRooms = make(map[string]ChatRoom)
}

type Connection struct {
stream lpb.Broadcast_CreateStreamServer
id string
active bool
error chan error
}

type Server struct {
lpb.UnimplementedBroadcastServer
Connections []*Connection
users2connections = make(map[string]*Connection)
chat2Connections = make(map[string]map[string]*Connection)
}

func (s *Server) GetChatRooms(_ context.Context, req *lpb.User) (*lpb.GetChatResp, error) {
Expand All @@ -69,29 +73,44 @@ func (s *Server) CreateChatRoom(_ context.Context, req *lpb.CreateChatReq) (*lpb
if _, roomExists := chatRooms[req.ChatName]; roomExists != true {
timestamp := time.Now()
id := sha256.Sum256([]byte(timestamp.String() + req.ChatName))

chatRooms[req.ChatName] = ChatRoom{id, req.ChatName}
users2ChatRooms[req.User.Name] = append(users2ChatRooms[req.User.Name], req.ChatName)
chatIdStr = append(chatIdStr, req.ChatName)
fmt.Printf("User %s created chat room %s", req.User.Name, req.ChatName)
fmt.Printf("User %s created chat room %s\n", req.User.Name, req.ChatName)
}

users2ChatRooms[req.User.Name] = append(users2ChatRooms[req.User.Name], req.ChatName)
if _, connInCat := chat2Connections[req.ChatName]; connInCat != true {
chat2Connections[req.ChatName] = make(map[string]*Connection)
}
if _, userConnForChat := chat2Connections[req.ChatName][req.User.Name]; userConnForChat != true {
chat2Connections[req.ChatName][req.User.Name] = users2connections[req.User.Name]
}
chatIdStr = append(chatIdStr, req.ChatName)
return &lpb.GetChatResp{ChatNames: chatIdStr}, nil
}

func (s *Server) CreateStream(protoConn *lpb.Connect, stream lpb.Broadcast_CreateStreamServer) error {
conn := &Connection{
stream, protoConn.User.Id, true, make(chan error),
}

if _, userExists := users[protoConn.User.Name]; userExists != true {
for _, chatName := range users2ChatRooms[protoConn.User.Name] {
chat2Connections[chatName][protoConn.User.Name] = conn
}
}
users2connections[protoConn.User.Name] = conn

fmt.Printf("CreateStream from user: %s\n", protoConn.User.Name)
s.Connections = append(s.Connections, conn)
s.Connections[protoConn.User.Name] = conn
return <-conn.error
}

func (s *Server) BroadcastMessage(_ context.Context, msg *lpb.Message) (*lpb.Close, error) {
wait := sync.WaitGroup{}
doneChan := make(chan int)

for _, conn := range s.Connections {
// send the message to each connection that exists in the current chat room
for _, conn := range chat2Connections[msg.ChatName] {
wait.Add(1)

go func(msg *lpb.Message, conn *Connection) {
Expand All @@ -116,15 +135,14 @@ func (s *Server) BroadcastMessage(_ context.Context, msg *lpb.Message) (*lpb.Clo
}

func main() {
var connections []*Connection
flag.Parse()
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", *serverPort))
if err != nil {
log.Fatalf("error creating the server %v", err)
}
grpcServer := grpc.NewServer()
server := &Server{}
server.Connections = connections
server.Connections = users2connections

lpb.RegisterBroadcastServer(grpcServer, server)
grpcLog.Info("Starting server at :", *serverPort)
Expand Down

0 comments on commit a92d093

Please sign in to comment.