Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: optimize stream peer task #763

Merged
merged 1 commit into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,10 @@ func (pt *peerTask) pullPiecesFromPeers(cleanUnfinishedFunc func()) {
defer func() {
cleanUnfinishedFunc()
}()

if !pt.waitFirstPeerPacket() {
// TODO 如果是客户端直接回源,这里不应该在输出错误日志
if ok, backSource := pt.waitFirstPeerPacket(); !ok {
if backSource {
return
}
pt.Errorf("wait first peer packet error")
return
}
Expand Down Expand Up @@ -487,7 +488,7 @@ func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize int32) (
return pieceRequestCh, true
}

func (pt *peerTask) waitFirstPeerPacket() bool {
func (pt *peerTask) waitFirstPeerPacket() (done bool, backSource bool) {
// wait first available peer
select {
case <-pt.ctx.Done():
Expand All @@ -502,13 +503,14 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
return true, false
}
// when scheduler says dfcodes.SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
pt.Infof("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source")
pt.needBackSource = true
pt.backSource()
return false, true
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonScheduleTimeout
Expand All @@ -521,9 +523,10 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
return false, true
}
}
return false
return false, false
}

func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
Expand Down
175 changes: 88 additions & 87 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s
firstPiece = first
}

pr, pw := io.Pipe()
attr := map[string]string{}
var readCloser io.ReadCloser = pr
var writer io.Writer = pw
if s.contentLength.Load() != -1 {
attr[headers.ContentLength] = fmt.Sprintf("%d", s.contentLength.Load())
} else {
Expand All @@ -289,89 +286,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s
attr[config.HeaderDragonflyTask] = s.taskID
attr[config.HeaderDragonflyPeer] = s.peerID

go func(first int32) {
defer func() {
s.cancel()
s.span.End()
}()
var (
desired int32
cur int32
wrote int64
err error
//ok bool
cache = make(map[int32]bool)
)
// update first piece to cache and check cur with desired
cache[first] = true
cur = first
for {
if desired == cur {
for {
delete(cache, desired)
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeTo(writer, desired)
if err != nil {
span.RecordError(err)
span.End()
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
span.End()
desired++
cached := cache[desired]
if !cached {
break
}
}
} else {
// not desired piece, cache it
cache[cur] = true
if cur < desired {
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur)
}
}

select {
case <-s.ctx.Done():
s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err())
s.span.RecordError(s.ctx.Err())
if err := pw.CloseWithError(s.ctx.Err()); err != nil {
s.Errorf("CloseWithError failed: %s", err)
}
return
case <-s.streamDone:
for {
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
pw.Close()
return
}
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeTo(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.span.RecordError(err)
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
span.End()
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
desired++
}
case cur = <-s.successPieceCh:
continue
}
}
}(firstPiece)
pr, pw := io.Pipe()
var readCloser io.ReadCloser = pr
go s.writeToPipe(firstPiece, pw)

return readCloser, attr, nil
}
Expand Down Expand Up @@ -428,7 +345,7 @@ func (s *streamPeerTask) SetTotalPieces(i int32) {
s.totalPiece = i
}

func (s *streamPeerTask) writeTo(w io.Writer, pieceNum int32) (int64, error) {
func (s *streamPeerTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) {
pr, pc, err := s.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{
PeerTaskMetaData: storage.PeerTaskMetaData{
PeerID: s.peerID,
Expand Down Expand Up @@ -476,3 +393,87 @@ func (s *streamPeerTask) backSource() {
_ = s.finish()
return
}

func (s *streamPeerTask) writeToPipe(firstPiece int32, pw *io.PipeWriter) {
defer func() {
s.cancel()
s.span.End()
}()
var (
desired int32
cur int32
wrote int64
err error
cache = make(map[int32]bool)
)
// update first piece to cache and check cur with desired
cache[firstPiece] = true
cur = firstPiece
for {
if desired == cur {
for {
delete(cache, desired)
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
span.End()
desired++
cached := cache[desired]
if !cached {
break
}
}
} else {
// not desired piece, cache it
cache[cur] = true
if cur < desired {
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur)
}
}

select {
case <-s.ctx.Done():
s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err())
s.span.RecordError(s.ctx.Err())
if err := pw.CloseWithError(s.ctx.Err()); err != nil {
s.Errorf("CloseWithError failed: %s", err)
}
return
case <-s.streamDone:
for {
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
s.Debugf("all %d pieces wrote to pipe", desired)
pw.Close()
return
}
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.span.RecordError(err)
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
span.End()
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
desired++
}
case cur = <-s.successPieceCh:
continue
}
}
}