Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client how use ring object? #7

Open
awzhgw opened this issue Jul 4, 2020 · 4 comments
Open

client how use ring object? #7

awzhgw opened this issue Jul 4, 2020 · 4 comments

Comments

@awzhgw
Copy link

awzhgw commented Jul 4, 2020

package main

import (
	"flag"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net"
	"github.com/hodgesds/iouring-go"
	"net/http"
	_ "net/http/pprof"
	"os"
	"path"
	"sync/atomic"
	"time"
)

var (
	size=flag.Int64("size",64,"default write size")
	rootDir=flag.String("root","/data0","default write dir")
	uring=flag.Bool("iouring",true,"is used iouring")
	role=flag.String("role","server","default role")
	addr1=flag.String("remote","127.0.0.1:8888","default remote addr")
	port=8888
	ring *iouring.Ring
)

func inita(){
	var err error
	if *uring{
		ring, err = iouring.New(
			8192,
			&iouring.Params{
				Features: iouring.FeatNoDrop,
			},
			iouring.WithID(100000),
		)
		if err!=nil{
			log.Fatalf("init failed %v",err)
		}
		iouring.FastOpenAllowed()
	}
}


func init() {
	rand.Seed(time.Now().UnixNano())
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func RandStringRunes(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letterRunes[rand.Intn(len(letterRunes))]
	}
	return string(b)
}

func main() {
	flag.Parse()

	inita()
	if *role=="server"{
		go func() {
			e := http.ListenAndServe(fmt.Sprintf(":%v", 8822), nil)
			if e != nil {
				log.Println(fmt.Errorf("cannot listen pprof %v err %v", 8822, e))
				os.Exit(1)
			}
		}()
		l,err:=Listen()
		if err!=nil {
			log.Fatal(err)
		}
		for {
			conn,err:=l.Accept()
			if err!=nil {
				log.Println(fmt.Sprintf("Accept error %v",err))
				continue
			}
			go Write(conn)
		}
	}else {
		go func() {
			e := http.ListenAndServe(fmt.Sprintf(":%v", 8821), nil)
			if e != nil {
				log.Println(fmt.Errorf("cannot listen pprof %v err %v", 8822, e))
				os.Exit(1)
			}
		}()
		conn,err:=Connect(*addr1)
		if err!=nil {
			log.Fatalf("connect error %v",err)
		}
		data:=RandStringRunes(int(*size))
		writeData:=([]byte)(data)
		for {
			_,err=conn.Write(writeData)
			if err!=nil{
				log.Println(fmt.Sprintf("write error %v",err))
				return
			}
		}
	}


}

func Listen()( l net.Listener,err error){
	if *uring{
		fmt.Printf("listening on port: %d\n", port)
		l, err := ring.SockoptListener(
			"tcp",
			fmt.Sprintf(":%d", port),
			func(err error) {
				log.Println(err)
			},
			iouring.SOReuseport,
		)
		if err != nil {
			log.Fatal(err)
		}
		return l,err
	}else {
		l,err=net.Listen("tcp",fmt.Sprintf(":%d", port))
		return l,err
	}
}

func Connect(addr string)(conn net.Conn,err error){
	conn,err=net.DialTimeout("tcp",addr,time.Second)
	if err!=nil {
		log.Println(fmt.Sprintf("Dail to %v err %v",addr,err))
		return
	}
	conn.(*net.TCPConn).SetNoDelay(true)
	conn.(*net.TCPConn).SetLinger(0)
	conn.(*net.TCPConn).SetKeepAlive(true)


	return
}

func Write(conn net.Conn){
	dst, err := os.OpenFile(path.Join(*rootDir,"1.txt"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
	if err != nil {
		log.Fatal(err)
	}


	data:=make([]byte,*size)
	var cnt uint64
	go func() {
		ticker:=time.NewTicker(time.Second)
		for {
			select {
				case <-ticker.C:
					log.Println(fmt.Sprintf("iops is %v",atomic.LoadUint64(&cnt)))
					atomic.StoreUint64(&cnt,0)
			}
		}
	}()

	if *uring{
		r, err := ring.FileReadWriter(dst)
		if err!=nil {
			log.Fatal(err)
		}
		for {
			_,err=io.ReadFull(conn,data)
			if err!=nil {
				log.Fatalf("read from conn error %v",err)
			}
			fmt.Println(fmt.Sprintf("recive data %v",string(data)))
			_,err=r.Write(data)
			if err!=nil {
				log.Fatalf("write error %v",err)
			}
			atomic.AddUint64(&cnt,1)
		}
	}else {
		for {
			_,err=io.ReadFull(conn,data)
			if err!=nil {
				log.Fatalf("read from conn error %v",err)
			}
			_,err=dst.Write(data)
			if err!=nil {
				log.Fatalf("write error %v",err)
			}
			atomic.AddUint64(&cnt,1)
		}
	}
	dst.Close()
}



this is my test code ....but i find the server can only recive 2 packet,then cannot recive any packet?

why????
@hodgesds
Copy link
Owner

hodgesds commented Jul 5, 2020

The server implementation is a little buggy currently. What needs to happen is after after every operation on the socket a repoll io_uring request has to be made to re-enable polling on the socket (you can see the code here). There's a few edge cases with the current socket handling and I'm pretty sure if some things are broken, especially when using clients with keep alive connections. I've noticed some of the same errors in my testing as well, but haven't had time to look into them more. I've been trying to get a good API for doing batches of operations that work in a single Enter call, which is what will really give performance compared to the Read/Write operations that exist today (you can see the extra overhead in the benchmarks).

I'm on holiday this weekend, but this looks like a good test case so I'll continue to test with it as I have time. There's also WithDebug that can be passed into the ring as well (although much of that debugging needs to be worked on).

@awzhgw
Copy link
Author

awzhgw commented Jul 8, 2020

@hodgesds I am a R&D engineer of distributed storage. I am very interested in your project. Our project github.com/chubaofs/chubaofs has now entered the cncf foundation. Our project is written in go language. Your iouring library, perfect it and contribute to us? Or join us to make a world-class storage project?

@hodgesds
Copy link
Owner

hodgesds commented Jul 8, 2020

Chubaofs looks really interesting! I have to check with my employer to double check licensing/contribution policies but it seems like this library could be useful.

I've just gotten around to getting an initial version of multiple writer (fan out) support, but the benchmarks seem to show some work still needs to be done. The same goes for the batching with the WithDeadline ring option, which also has a benchmark.

I think this project has a lot of potential, but there are quite a few things that need to be worked on to get it production ready. I think I will be looking for contributors and also try to work on making sure that users of the project are up to date with what the current state of the project is.

@awzhgw
Copy link
Author

awzhgw commented Jul 8, 2020

@hodgesds
I mean: after you can make your iouring-go code base, our chubaofs can be used directly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants