Skip to content

Commit

Permalink
Fix: Datanode: when tinyExtentRepair auto repair,it has been
Browse files Browse the repository at this point in the history
repairTimeout

Signed-off-by: awzhgw <[email protected]>
  • Loading branch information
awzhgw committed Aug 28, 2019
1 parent 4dd740c commit b370e8d
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 31 deletions.
8 changes: 4 additions & 4 deletions datanode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ const (
)

const (
EmptyResponse = 'E'
EmptyPacketLength = 9
MaxSyncTinyDeleteBufferSize = 2400000
MaxFullSyncTinyDeleteTime = 3600 * 24
EmptyResponse = 'E'
TinyExtentRepairReadResponseArgLen = 17
MaxSyncTinyDeleteBufferSize = 2400000
MaxFullSyncTinyDeleteTime = 3600 * 24
)
34 changes: 25 additions & 9 deletions datanode/data_partition_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,9 @@ func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo
return
}
currFixOffset := localExtentInfo.Size
var (
hasRecoverySize uint64
)
for currFixOffset < remoteExtentInfo.Size {
if currFixOffset >= remoteExtentInfo.Size {
break
Expand All @@ -473,26 +476,26 @@ func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo

// read 64k streaming repair packet
if err = reply.ReadFromConn(conn, 60); err != nil {
err = errors.Trace(err, "streamRepairExtent receive data error,localExtentSize(%v) remoteExtentSize(%v)",currFixOffset,remoteExtentInfo.Size)
err = errors.Trace(err, "streamRepairExtent receive data error,localExtentSize(%v) remoteExtentSize(%v)", currFixOffset, remoteExtentInfo.Size)
return
}

if reply.ResultCode != proto.OpOk {
err = errors.Trace(fmt.Errorf("unknow result code"),
"streamRepairExtent receive opcode error(%v) ,localExtentSize(%v) remoteExtentSize(%v)", string(reply.Data[:reply.Size]),currFixOffset,remoteExtentInfo.Size)
"streamRepairExtent receive opcode error(%v) ,localExtentSize(%v) remoteExtentSize(%v)", string(reply.Data[:reply.Size]), currFixOffset, remoteExtentInfo.Size)
return
}

if reply.ReqID != request.ReqID || reply.PartitionID != request.PartitionID ||
reply.ExtentID != request.ExtentID {
err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent receive unavalid "+
"request(%v) reply(%v) ,localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(),currFixOffset,remoteExtentInfo.Size)
"request(%v) reply(%v) ,localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, remoteExtentInfo.Size)
return
}

if !storage.IsTinyExtent(reply.ExtentID) && (reply.Size == 0 || reply.ExtentOffset != int64(currFixOffset)) {
err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent receive unavalid "+
"request(%v) reply(%v) localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(),currFixOffset,remoteExtentInfo.Size)
"request(%v) reply(%v) localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, remoteExtentInfo.Size)
return
}

Expand All @@ -510,13 +513,25 @@ func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo
isEmptyResponse := false
// Write it to local extent file
if storage.IsTinyExtent(uint64(localExtentInfo.FileID)) {
writeSize := uint64(reply.Size)
if reply.ArgLen != 0 {
currRecoverySize := uint64(reply.Size)
var remoteAvaliSize uint64
if reply.ArgLen == TinyExtentRepairReadResponseArgLen {
remoteAvaliSize = binary.BigEndian.Uint64(reply.Arg[9:TinyExtentRepairReadResponseArgLen])
}
if reply.Arg != nil { //compact v1.2.0 recovery
isEmptyResponse = reply.Arg[0] == EmptyResponse
writeSize = binary.BigEndian.Uint64(reply.Arg[1:EmptyPacketLength])
reply.Size = uint32(writeSize)
}
err = store.TinyExtentRecover(uint64(localExtentInfo.FileID), int64(currFixOffset), int64(writeSize), reply.Data, reply.CRC, isEmptyResponse)
if isEmptyResponse {
currRecoverySize = binary.BigEndian.Uint64(reply.Arg[1:9])
reply.Size = uint32(currRecoverySize)
}
err = store.TinyExtentRecover(uint64(localExtentInfo.FileID), int64(currFixOffset), int64(currRecoverySize), reply.Data, reply.CRC, isEmptyResponse)
if hasRecoverySize+currRecoverySize >= remoteAvaliSize {
log.LogInfof("streamRepairTinyExtent(%v) recover fininsh,remoteAvaliSize(%v) "+
"hasRecoverySize(%v) currRecoverySize(%v)", dp.applyRepairKey(int(localExtentInfo.FileID)),
remoteAvaliSize, hasRecoverySize+currRecoverySize, currRecoverySize)
break
}
} else {
err = store.Write(uint64(localExtentInfo.FileID), int64(currFixOffset), int64(reply.Size), reply.Data, reply.CRC, UpdateSize, BufferWrite)
}
Expand All @@ -526,6 +541,7 @@ func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo
err = errors.Trace(err, "streamRepairExtent repair data error ")
return
}
hasRecoverySize += uint64(reply.Size)
currFixOffset += uint64(reply.Size)
if currFixOffset >= remoteExtentInfo.Size {
break
Expand Down
10 changes: 5 additions & 5 deletions datanode/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewDisk(path string, restSize uint64, maxErrCnt int, space *SpaceManager) (
d.Path = path
d.ReservedSpace = restSize
d.MaxErrCnt = maxErrCnt
d.RejectWrite=false
d.RejectWrite = false
d.space = space
d.partitionMap = make(map[uint64]*DataPartition)
d.computeUsage()
Expand Down Expand Up @@ -123,10 +123,10 @@ func (d *Disk) computeUsage() (err error) {
if unallocated < 0 {
unallocated = 0
}
if d.Available<=0{
d.RejectWrite=true
}else {
d.RejectWrite=false
if d.Available <= 0 {
d.RejectWrite = true
} else {
d.RejectWrite = false
}
d.Unallocated = uint64(unallocated)

Expand Down
14 changes: 7 additions & 7 deletions datanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type DataPartition struct {
snapshotMutex sync.RWMutex
intervalToUpdatePartitionSize int64
loadExtentHeaderStatus int
FullSyncTinyDeleteTime int64
FullSyncTinyDeleteTime int64
}

func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk, request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error) {
Expand Down Expand Up @@ -303,7 +303,7 @@ func (dp *DataPartition) Disk() *Disk {
return dp.disk
}

func (dp *DataPartition) IsRejectWrite() bool{
func (dp *DataPartition) IsRejectWrite() bool {
return dp.Disk().RejectWrite
}

Expand Down Expand Up @@ -351,11 +351,11 @@ func (dp *DataPartition) PersistMetadata(dataPartitionCreateType int) (err error
sort.Sort(sp)

md := &DataPartitionMetadata{
VolumeID: dp.config.VolName,
PartitionID: dp.config.PartitionID,
PartitionSize: dp.config.PartitionSize,
Peers: dp.config.Peers,
Hosts: dp.config.Hosts,
VolumeID: dp.config.VolName,
PartitionID: dp.config.PartitionID,
PartitionSize: dp.config.PartitionSize,
Peers: dp.config.Peers,
Hosts: dp.config.Hosts,
DataPartitionCreateType: dataPartitionCreateType,
CreateTime: time.Now().Format(TimeLayout),
}
Expand Down
4 changes: 2 additions & 2 deletions datanode/partition_op_by_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ func (dp *DataPartition) ApplyRandomWrite(command []byte, raftApplyID uint64) (r
resp = proto.OpDiskErr
}
}()
if dp.IsRejectWrite(){
err=fmt.Errorf("partition(%v) disk(%v) err(%v)",dp.partitionID,dp.Disk().Path,syscall.ENOSPC)
if dp.IsRejectWrite() {
err = fmt.Errorf("partition(%v) disk(%v) err(%v)", dp.partitionID, dp.Disk().Path, syscall.ENOSPC)
return
}

Expand Down
14 changes: 10 additions & 4 deletions datanode/wrap_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *DataNode) handlePacketToCreateExtent(p *repl.Packet) {
}
}()
partition := p.Object.(*DataPartition)
if partition.Available() <= 0 || partition.disk.Status == proto.ReadOnly || partition.IsRejectWrite(){
if partition.Available() <= 0 || partition.disk.Status == proto.ReadOnly || partition.IsRejectWrite() {
err = storage.NoSpaceError
return
} else if partition.disk.Status == proto.Unavailable {
Expand Down Expand Up @@ -536,10 +536,8 @@ func (s *DataNode) writeEmptyPacketOnTinyExtentRepairRead(reply *repl.Packet, ne
reply.CRC = crc32.ChecksumIEEE(reply.Data)
reply.ResultCode = proto.OpOk
reply.ExtentOffset = currentOffset
reply.ArgLen = EmptyPacketLength
reply.Arg = make([]byte, EmptyPacketLength)
reply.Arg[0] = EmptyResponse
binary.BigEndian.PutUint64(reply.Arg[1:], uint64(replySize))
binary.BigEndian.PutUint64(reply.Arg[1:9], uint64(replySize))
err = reply.WriteToConn(connect)
reply.Size = uint32(replySize)
logContent := fmt.Sprintf("action[operatePacket] %v.",
Expand All @@ -549,6 +547,10 @@ func (s *DataNode) writeEmptyPacketOnTinyExtentRepairRead(reply *repl.Packet, ne
return
}

func (s *DataNode) attachAvaliSizeOnTinyExtentRepairRead(reply *repl.Packet, avaliSize uint64) {
binary.BigEndian.PutUint64(reply.Arg[9:17], avaliSize)
}

// Handle handleTinyExtentRepairRead packet.
func (s *DataNode) handleTinyExtentRepairRead(request *repl.Packet, connect net.Conn) {
var (
Expand Down Expand Up @@ -579,6 +581,7 @@ func (s *DataNode) handleTinyExtentRepairRead(request *repl.Packet, connect net.
if uint64(request.ExtentOffset)+uint64(request.Size) > tinyExtentFinfoSize {
needReplySize = int64(tinyExtentFinfoSize - uint64(request.ExtentOffset))
}
avaliReplySize := uint64(needReplySize)

var (
newOffset, newEnd int64
Expand All @@ -588,6 +591,9 @@ func (s *DataNode) handleTinyExtentRepairRead(request *repl.Packet, connect net.
break
}
reply := repl.NewTinyExtentStreamReadResponsePacket(request.ReqID, request.PartitionID, request.ExtentID)
reply.ArgLen = TinyExtentRepairReadResponseArgLen
reply.Arg = make([]byte, TinyExtentRepairReadResponseArgLen)
s.attachAvaliSizeOnTinyExtentRepairRead(reply, avaliReplySize)
newOffset, newEnd, err = store.TinyExtentAvaliOffset(request.ExtentID, offset)
if err != nil {
return
Expand Down

0 comments on commit b370e8d

Please sign in to comment.