Skip to content

Commit

Permalink
feat(rdma): Add rdma bench mark
Browse files Browse the repository at this point in the history
Signed-off-by: Wu Huocheng <[email protected]>
  • Loading branch information
aaronwu2010 committed Dec 4, 2024
1 parent 49233e0 commit 65726f6
Show file tree
Hide file tree
Showing 42 changed files with 6,186 additions and 0 deletions.
67 changes: 67 additions & 0 deletions util/rdma/rdma_benchmark/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
�ֵ���������̨�����ϣ�֮��ֱ����client��server��ִ��go build��֮������������

[service@server-machine server]$ ./server -h
Usage of ./server:
-deep int
io deep (default 1)
-ip string
127.0.0.1 (default "127.0.0.1")
-port string
9000 (default "9000")
-protocol string
rdma/tcp (default "rdma")
-size int
io size (default 4096)

[service@client-machine client]$ ./client -h
Usage of ./client:
-deep int
io deep (default 1)
-ip string
127.0.0.1 (default "127.0.0.1")
-port string
9000 (default "9000")
-protocol string
rdma/tcp (default "rdma")
-size int
io size (default 4096)


-----------------------------------------------------------------------------------------------------------------------

[service@client-machine client]$ ./client -ip 192.168.12.100 -protocol tcp -port 17370 -ip 192.168.12.100
param {1 4096 tcp 192.168.12.100 17370}
IOPS=[10792.00], TOTAL_BPS=[84.31M], AVG_TM=[92.55us]
IOPS=[10316.00], TOTAL_BPS=[80.59M], AVG_TM=[96.83us]
IOPS=[9843.33], TOTAL_BPS=[76.90M], AVG_TM=[101.49us]
IOPS=[9784.50], TOTAL_BPS=[76.44M], AVG_TM=[102.10us]
IOPS=[9676.60], TOTAL_BPS=[75.60M], AVG_TM=[103.24us]
^C
[service@client-machine client]$ ./client -ip 192.168.12.100 -protocol rdma -port 17370 -ip 192.168.12.100
param {1 4096 rdma 192.168.12.100 17370}
IOPS=[54937.00], TOTAL_BPS=[429.20M], AVG_TM=[16.69us]
IOPS=[59708.50], TOTAL_BPS=[466.48M], AVG_TM=[16.03us]
IOPS=[61561.00], TOTAL_BPS=[480.95M], AVG_TM=[15.76us]

-----------------------------------------------------------------------------------------------------------------------
[service@server-machine server]$ ./server -protocol tcp -port 17370 -ip 192.168.12.100
param {1 4096 tcp 192.168.12.100 17370}
IOPS=[0.00], TOTAL_BPS=[0.00M], AVG_TM=[NaNus]
IOPS=[0.00], TOTAL_BPS=[0.00M], AVG_TM=[NaNus]
IOPS=[0.00], TOTAL_BPS=[0.00M], AVG_TM=[NaNus]
IOPS=[10017.00], TOTAL_BPS=[78.26M], AVG_TM=[92.40us]
IOPS=[10007.00], TOTAL_BPS=[78.18M], AVG_TM=[96.18us]
IOPS=[9610.33], TOTAL_BPS=[75.08M], AVG_TM=[101.43us]
IOPS=[9617.75], TOTAL_BPS=[75.14M], AVG_TM=[102.00us]
IOPS=[9543.20], TOTAL_BPS=[74.56M], AVG_TM=[103.18us]
^C
[service@server-machine server]$
[service@server-machine server]$ ./server -protocol rdma -port 17370 -ip 192.168.12.100
param {1 4096 rdma 192.168.12.100 17370}
IOPS=[0.00], TOTAL_BPS=[0.00M], AVG_TM=[NaNus]
IOPS=[0.00], TOTAL_BPS=[0.00M], AVG_TM=[NaNus]
IOPS=[59072.00], TOTAL_BPS=[461.50M], AVG_TM=[16.69us]
IOPS=[62272.50], TOTAL_BPS=[486.50M], AVG_TM=[15.91us]
IOPS=[62930.33], TOTAL_BPS=[491.64M], AVG_TM=[15.78us]

-----------------------------------------------------------------------------------------------------------------------
Binary file added util/rdma/rdma_benchmark/client/client
Binary file not shown.
126 changes: 126 additions & 0 deletions util/rdma/rdma_benchmark/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package main

import (
"net"
"rdma_test/common"
"rdma_test/rdma"
"time"
)

var exit = false
var Config = &rdma.RdmaEnvConfig{}

func ReadBytes(conn net.Conn, buf []byte) error {
offset := 0
for offset < len(buf) {
n, err := conn.Read(buf[offset:])
if n == -1 || (err != nil) {
println("ReadBytes failed, err: ", err)
return err
}
offset += n
}
return nil
}


func testRdma() {
if err := rdma.InitPool(Config); err != nil {
println("init rdma pool failed")
return
}

for i := 0; i < common.GParam.IoDeep; i++ {

go func() {
conn := &rdma.Connection{}
if err := conn.Dial(common.GParam.Ip,common.GParam.Port); err != nil {
println("client rdma conn dial failed")
return
}
defer conn.Close()
for !exit {
beginTm := time.Now()
p := common.NewWritePacket(common.NormalExtentType, conn)
p.Size = uint32(common.GParam.IoSize)

if err := p.WriteToRDMAConn(conn); err != nil {
println(err.Error())
break
}

reply := common.NewReply(p.ReqID)

if err := reply.RecvRespFromRDMAConn(conn, 5); err != nil {
println(err.Error())
break
}

err := conn.ReleaseConnRxDataBuffer(reply.RdmaBuffer)
if err != nil {
println(err.Error())
break
}
err = rdma.ReleaseDataBuffer(conn, p.RdmaBuffer, uint32(105 + common.GParam.IoSize))
if err != nil {
println(err.Error())
break
}

common.Stat().AddSumTime(common.GParam.IoSize, time.Now().UnixNano() / 1000 - beginTm.UnixNano() / 1000)
}
}()
}
}

func testTcp() {
common.InitBufferPool(0)

for i := 0; i < common.GParam.IoDeep; i++ {
go func() {
c, err := net.Dial("tcp", common.GParam.Ip + ":" + common.GParam.Port)
if err != nil {
println(err.Error())
return
}
conn, _ := c.(*net.TCPConn)
conn.SetKeepAlive(true)
conn.SetNoDelay(true)
defer conn.Close()
for !exit {
beginTm := time.Now()
p := common.NewWritePacket(common.NormalExtentType, conn)
p.Size = uint32(common.GParam.IoSize)

if err := p.WriteToConn(conn); err != nil {
println(err.Error())
break
}

reply := common.NewReply(p.ReqID)
if err := reply.ReadFromConn(conn, 5); err != nil {
println(err.Error())
break
}

common.Stat().AddSumTime(common.GParam.IoSize, time.Now().UnixNano() / 1000 - beginTm.UnixNano() / 1000)
}
}()
}
}


func main() {
common.ParseParam()
if common.GParam.Protocol == "rdma" {
go testRdma()
} else {
go testTcp()
}

for !exit {
time.Sleep(time.Millisecond * 1000)
common.Stat().Print()
}
}

Empty file.
178 changes: 178 additions & 0 deletions util/rdma/rdma_benchmark/common/buffer_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package common

import (
"fmt"
"rdma_test/common/context"
"rdma_test/common/rate"
"sync"
"sync/atomic"
)

const (
HeaderBufferPoolSize = 8192
InvalidLimit = 0
)

var ReadBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 32*1024)
return b
},
}

var tinyBuffersTotalLimit int64 = 4096
var NormalBuffersTotalLimit int64
var HeadBuffersTotalLimit int64

var tinyBuffersCount int64
var normalBuffersCount int64
var headBuffersCount int64

var normalBufAllocId uint64
var headBufAllocId uint64

var normalBufFreecId uint64
var headBufFreeId uint64

var buffersRateLimit = rate.NewLimiter(rate.Limit(16), 16)
var normalBuffersRateLimit = rate.NewLimiter(rate.Limit(16), 16)
var headBuffersRateLimit = rate.NewLimiter(rate.Limit(16), 16)

func NewTinyBufferPool() *sync.Pool {
return &sync.Pool{
New: func() interface{} {
if atomic.LoadInt64(&tinyBuffersCount) >= tinyBuffersTotalLimit {
ctx := context.Background()
buffersRateLimit.Wait(ctx)
}
return make([]byte, DefaultTinySizeLimit)
},
}
}

func NewHeadBufferPool() *sync.Pool {
return &sync.Pool{
New: func() interface{} {
if HeadBuffersTotalLimit != InvalidLimit && atomic.LoadInt64(&headBuffersCount) >= HeadBuffersTotalLimit {
ctx := context.Background()
headBuffersRateLimit.Wait(ctx)
}
return make([]byte, PacketHeaderSize)
},
}
}

func NewNormalBufferPool() *sync.Pool {
return &sync.Pool{
New: func() interface{} {
if NormalBuffersTotalLimit != InvalidLimit && atomic.LoadInt64(&normalBuffersCount) >= NormalBuffersTotalLimit {
ctx := context.Background()
normalBuffersRateLimit.Wait(ctx)
}
return make([]byte, BlockSize)
},
}
}

// BufferPool defines the struct of a buffered pool with 4 objects.
type BufferPool struct {
headPools []chan []byte
normalPools []chan []byte
tinyPool *sync.Pool
headPool *sync.Pool
normalPool *sync.Pool
}

var (
slotCnt = uint64(16)
)

// NewBufferPool returns a new buffered pool.
func NewBufferPool() (bufferP *BufferPool) {
bufferP = &BufferPool{}

bufferP.headPools = make([]chan []byte, slotCnt)
bufferP.normalPools = make([]chan []byte, slotCnt)
for i := 0; i < int(slotCnt); i++ {
bufferP.headPools[i] = make(chan []byte, HeaderBufferPoolSize/slotCnt)
bufferP.normalPools[i] = make(chan []byte, HeaderBufferPoolSize/slotCnt)
}

bufferP.tinyPool = NewTinyBufferPool()
bufferP.headPool = NewHeadBufferPool()
bufferP.normalPool = NewNormalBufferPool()
return bufferP
}
func (bufferP *BufferPool) getHead(id uint64) (data []byte) {
select {
case data = <-bufferP.headPools[id%slotCnt]:
return
default:
return bufferP.headPool.Get().([]byte)
}
}

func (bufferP *BufferPool) getNoraml(id uint64) (data []byte) {
select {
case data = <-bufferP.normalPools[id%slotCnt]:
return
default:
return bufferP.normalPool.Get().([]byte)
}
}

// Get returns the data based on the given size. Different size corresponds to different object in the pool.
func (bufferP *BufferPool) Get(size int) (data []byte, err error) {
if size == PacketHeaderSize {
atomic.AddInt64(&headBuffersCount, 1)
id := atomic.AddUint64(&headBufAllocId, 1)
return bufferP.getHead(id), nil
} else if size == BlockSize {
atomic.AddInt64(&normalBuffersCount, 1)
id := atomic.AddUint64(&normalBufAllocId, 1)
return bufferP.getNoraml(id), nil
} else if size == DefaultTinySizeLimit {
atomic.AddInt64(&tinyBuffersCount, 1)
return bufferP.tinyPool.Get().([]byte), nil
}
return nil, fmt.Errorf("can only support 45 or 65536 bytes")
}

func (bufferP *BufferPool) putHead(index int, data []byte) {
select {
case bufferP.headPools[index] <- data:
return
default:
bufferP.headPool.Put(data)
}
}

func (bufferP *BufferPool) putNormal(index int, data []byte) {
select {
case bufferP.normalPools[index] <- data:
return
default:
bufferP.normalPool.Put(data)
}
}

// Put puts the given data into the buffer pool.
func (bufferP *BufferPool) Put(data []byte) {
if data == nil {
return
}
size := len(data)
if size == PacketHeaderSize {
atomic.AddInt64(&headBuffersCount, -1)
id := atomic.AddUint64(&headBufFreeId, 1)
bufferP.putHead(int(id%slotCnt), data)
} else if size == BlockSize {
atomic.AddInt64(&normalBuffersCount, -1)
id := atomic.AddUint64(&normalBufFreecId, 1)
bufferP.putNormal(int(id%slotCnt), data)
} else if size == DefaultTinySizeLimit {
bufferP.tinyPool.Put(data)
atomic.AddInt64(&tinyBuffersCount, -1)
}
return
}
Loading

0 comments on commit 65726f6

Please sign in to comment.