Skip to content

Commit 80c0e9e

Browse files
committed
Upload Project Structure
1 parent 75416f8 commit 80c0e9e

27 files changed

+268
-0
lines changed

cmd/SERVER/main.go

Whitespace-only changes.

config.yaml

Whitespace-only changes.

data/topics/my_test_topic_0/000001.index

Whitespace-only changes.

data/topics/my_test_topic_0/000001.log

Whitespace-only changes.

go.mod

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module ryanMQ
2+
3+
go 1.22.5
4+
5+
require github.com/golang/snappy v0.0.4

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
2+
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=

internal/core/broker/dispatch.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/*
2+
消息路由
3+
根据消息Key哈希到指定分区 (实现负载均衡)
4+
5+
​关键技术:
6+
用 sync.Map 缓存主题元数据
7+
分区选择算法:hash(key) % partition_num
8+
*/

internal/core/broker/readme

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# 消息调度中枢

internal/core/broker/replication.go

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/*
2+
副本同步 分布式支持,暂不考虑
3+
*/

internal/core/broker/topic_manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/*
2+
主题管理:创建、删除主题
3+
维护主题(Topic)与分区(Partition)的映射关系
4+
*/
+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/*
2+
持久化消费者Offset到 LevelDB/BoltDB
3+
提供Commit(Offset) 和 Fetch()接口
4+
*/
+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
分区元信息
3+
4+
记录Leader/Follower 节点 (分布式需求)
5+
维护ISR列表(In-Sync Replica)
6+
7+
​关键技术:
8+
用嵌入式数据库(如 BoltDB)存储元数据
9+
*/
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
处理消费者请求
3+
4+
根据Offset读取消息
5+
支持长轮询 (Long Polling)
6+
7+
​关键技术:
8+
基于 io.Reader/io.Writer 实现协议解析
9+
用 goroutine 池处理并发请求
10+
*/
+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
消息编解码
3+
4+
定义二进制协议格式(参考Kafka)
5+
处理压缩(Snappy)
6+
*/
7+
package protocol
8+
9+
import (
10+
"bytes"
11+
"encoding/binary"
12+
"ryanMQ/internal/utils/log"
13+
14+
"github.com/golang/snappy"
15+
)
16+
17+
const (
18+
ProduceRequestType = byte(0x01)
19+
FetchRequestType = byte(0x02)
20+
OffsetCommitType = byte(0x03)
21+
TopicMetadataRequestType = byte(0x04)
22+
)
23+
24+
func EncodeProduceRequest(req *ProduceRequest) ([]byte, error) {
25+
buf := new(bytes.Buffer) //写入缓冲区
26+
27+
//主题 :2 字节长度(int16类型) + 数据
28+
topicBytes := []byte(req.Topic)
29+
//大端字节序存储
30+
if err := binary.Write(buf, binary.BigEndian, int16(len(topicBytes))); err != nil {
31+
return nil, err
32+
}
33+
buf.Write(topicBytes)
34+
35+
//分区号:4字节
36+
binary.Write(buf, binary.BigEndian, req.Partition)
37+
38+
//消息列表 (每条消息: 4字节 长度(len) + 数据)
39+
for _, msg := range req.Messages {
40+
binary.Write(buf, binary.BigEndian, int32(len(msg)))
41+
buf.Write(msg)
42+
}
43+
44+
//构建最终请求(添加请求头和长度)
45+
header := RequestHeader{
46+
Length: int32(buf.Len()),
47+
RequestType: ProduceRequestType, //生产者请求
48+
}
49+
headerBytes, err := encodeHeader(header)
50+
if err != nil {
51+
log.Error("encode header error: %v", err)
52+
return nil, err
53+
}
54+
55+
return append(headerBytes, buf.Bytes()...), nil
56+
}
57+
58+
func encodeHeader(header RequestHeader) ([]byte, error) {
59+
buf := new(bytes.Buffer)
60+
61+
if err := binary.Write(buf, binary.BigEndian, header.Length); err != nil {
62+
return nil, err
63+
}
64+
65+
if err := binary.Write(buf, binary.BigEndian, header.RequestType); err != nil {
66+
return nil, err
67+
}
68+
69+
return buf.Bytes(), nil
70+
}
71+
72+
func DecodeProduceRequest(data []byte) (*ProduceRequest, error) {
73+
buf := bytes.NewBuffer(data)
74+
req := &ProduceRequest{}
75+
76+
//解析请求头
77+
78+
}
79+
80+
func EncodeConsumeRequest()
81+
82+
func CompressWithSnappy(data []byte) ([]byte, error) {
83+
return snappy.Encode(nil, data), nil
84+
}
85+
86+
func DecompressWithSnappy(data []byte) ([]byte, error) {
87+
return snappy.Decode(nil, data)
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/*
2+
处理生产者请求
3+
4+
批量写入消息
5+
返回写入成功的Offset
6+
*/

internal/core/protocol/readme

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# 协议处理

internal/core/protocol/types.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package protocol
2+
3+
type RequestHeader struct {
4+
Length int32 //消息长度
5+
RequestType byte //请求类型
6+
}
7+
8+
// 生产者请求
9+
type ProduceRequest struct {
10+
Topic string
11+
Partition int32
12+
Messages [][]byte //多条消息, 支持批量
13+
}
14+
15+
type ProduceResponse struct {
16+
Status byte //成功/失败
17+
Offset int64 //写入的Offset
18+
}
19+
20+
type FetchRequest struct {
21+
Topic string
22+
Partition int32
23+
StartOffset int64 //消费起始位置
24+
MaxMsgNum int32 //最多消费(拉取)多少条
25+
}
26+
27+
type FetchResponse struct {
28+
Status byte //成功/失败
29+
NextOffset int64 //下次拉取的起始位置
30+
Messages [][]byte //实际返回的消息
31+
}

internal/core/storage/patition.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
维护分区下的多个segment
3+
根据时间/空间策略清理旧数据 retention策略
4+
5+
​关键技术:
6+
使用 os.File + bufio.Writer 实现缓冲写入
7+
​零拷贝读取:syscall.Sendfile 直接发送文件内容
8+
​内存映射:syscall.Mmap 加速索引文件读取
9+
10+
*/

internal/core/storage/readme

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# 存储引擎

internal/core/storage/segment.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/*
2+
管理单个分段文件 (.log + .index)
3+
实现消息追加 func Append(msg []byte)
4+
构建稀疏索引 4KB/条
5+
处理文件滚动 达到 segment.max.size 后创建新文件
6+
*/

internal/core/storage/wal.go

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/*
2+
预写日志 (Write-Ahead Log, 用于未刷盘数据的恢复)
3+
4+
*/

internal/network/tcp/connection.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
连接管理
3+
4+
封包/拆包 (处理粘包)
5+
心跳维护 (保活机制)
6+
7+
8+
基于 net.Listener 实现非阻塞 IO
9+
使用 goroutine-per-connection 模型(Go 高并发优势)
10+
*/

internal/network/tcp/server.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/*
2+
TCP服务器
3+
4+
监听端口,接受连接
5+
为每个连接启动处理协程
6+
*/

internal/utils/log/log.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package log
2+
3+
import (
4+
"log"
5+
)
6+
7+
var (
8+
DebugEnabled = true //默认开启 Debug
9+
)
10+
11+
func Debug(format string, v ...any) {
12+
if DebugEnabled {
13+
log.Printf("[DEBUG] "+format, v...)
14+
}
15+
}
16+
17+
func Error(format string, v ...any) {
18+
log.Printf("[ERROR] "+format, v...)
19+
}

pkg/api/consumer.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
consumer.go:消费者接口
3+
Subscribe(topic string) <-chan Message
4+
Ack(offset int) 手动提交 Offset
5+
6+
关键技术:
7+
基于 net.Dial 实现客户端通信
8+
用 channel 实现消息推送
9+
*/

pkg/api/producer.go

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/*
2+
producer.go:生产者接口
3+
Send(msg []byte) (offset int, err error)
4+
支持同步/异步发送
5+
*/

tutorial.md

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# 存储设计
2+
3+
- [ ] 分段文件 + 内存映射(mmap)减少磁盘IO
4+
- [ ] 索引文件 跳表结构 (offset --> 物理地址)
5+
- [ ] 多副本同步写入 (ISR)
6+
7+
顺序写入: 避免随机IO
8+
分段存储(segment 对应一个 offset 快速定位 清理过期数据) 稀疏索引
9+
直接利用操作系统页缓存 磁盘文件映射到内存
10+
11+
零拷贝传输 : sendfile系统调用,直接将磁盘数据通过DMA传输到网卡
12+
13+
作batch单位传输 将数据写到页缓存,操作系统异步刷盘 flush.internal.ms 控制刷盘频率
14+
15+
:ELK日志系统
16+
17+
按时间和bytes大小删除过期segment
18+
**故障容错和恢复**
19+
​Checksum 校验:每条消息包含 CRC32 校验码,防止数据损坏。
20+
​恢复日志(Recovery Log)​:Broker 重启时通过读取 Segment 文件和索引重建状态,而非依赖外部数据库。
21+
​可借鉴点
22+
​自包含存储:将元数据与数据统一存储(如将索引与数据文件绑定),避免外部依赖。
23+
​数据完整性:在关键路径添加校验机制(如网络传输、持久化存储)。
24+
内存**池化** 减少GC垃圾回收压力
25+
事件驱动模型 :
26+
FileChannel.mmap 频繁读写时 将文件映射到内存

0 commit comments

Comments
 (0)