Skip to content

Commit

Permalink
feat: compatible with V1 preheat (#720)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 28, 2023
1 parent 77bb686 commit 5857e52
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 7 deletions.
2 changes: 2 additions & 0 deletions docs/en/user-guide/preheat/preheat.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ TODO

Use preheat apis for preheating. First create a POST request for preheating, you can refer to [create preheat api document](../../api/api.md#create-preheat)

If the `scheduler_cluster_id` does not exist, it means to preheat all scheduler clusters.

```bash
curl --request POST 'http://dragonfly-manager:8080/api/v1/preheats' \
--header 'Content-Type: application/json' \
Expand Down
3 changes: 2 additions & 1 deletion docs/zh-CN/user-guide/preheat/preheat.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ TODO

用户使用 api 进行预热。首先发送 POST 请求创建预热任务,具体 api 可以参考文档 [create preheat api document](../../api/api.md#create-preheat)

如果 `scheduler_cluster_id` 不存在,表示对所有 scheduler cluster 进行预热。

```bash
curl --request POST 'http://dragonfly-manager:8080/api/v1/preheats' \
--header 'Content-Type: application/json' \
Expand All @@ -28,7 +30,6 @@ curl --request POST 'http://dragonfly-manager:8080/api/v1/preheats' \

使用预热任务 ID 轮训查询任务是否成功,具体 api 可以参考文档 [get preheat api document](../../api/api.md#get-preheat)


```bash
curl --request GET 'http://manager-domain:8080/api/v1/preheats/group_28439e0b-d4c3-43bf-945e-482b54c49dc5'
```
Expand Down
54 changes: 54 additions & 0 deletions manager/handlers/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,57 @@ func (h *Handlers) GetPreheat(ctx *gin.Context) {

ctx.JSON(http.StatusOK, preheat)
}

// @Summary Create V1 Preheat
// @Description create by json config
// @Tags Preheat
// @Accept json
// @Produce json
// @Param CDN body types.CreateV1PreheatRequest true "Preheat"
// @Success 200 {object} types.CreateV1PreheatResponse
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /preheats [post]
func (h *Handlers) CreateV1Preheat(ctx *gin.Context) {
var json types.CreateV1PreheatRequest
if err := ctx.ShouldBindJSON(&json); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

preheat, err := h.service.CreateV1Preheat(json)
if err != nil {
ctx.Error(err)
return
}

ctx.JSON(http.StatusOK, preheat)
}

// @Summary Get V1 Preheat
// @Description Get Preheat by id
// @Tags Preheat
// @Accept json
// @Produce json
// @Param id path string true "id"
// @Success 200 {object} types.GetV1PreheatResponse
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /preheats/{id} [get]
func (h *Handlers) GetV1Preheat(ctx *gin.Context) {
var params types.PreheatParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

preheat, err := h.service.GetV1Preheat(params.ID)
if err != nil {
ctx.Error(err)
return
}

ctx.JSON(http.StatusOK, preheat)
}
10 changes: 8 additions & 2 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"time"

logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/util/net/httputils"
Expand Down Expand Up @@ -110,6 +111,7 @@ func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []i
for _, file := range files {
args, err := internaljob.MarshalRequest(file)
if err != nil {
logger.Errorf("preheat marshal request: %v, error: %v", file, err)
continue
}

Expand All @@ -120,6 +122,7 @@ func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []i
})
}
}
logger.Infof("preheat file count: %d queues: %v", len(signatures), queues)

group, err := machineryv1tasks.NewGroup(signatures...)
if err != nil {
Expand Down Expand Up @@ -198,13 +201,16 @@ func (p *preheat) parseLayers(resp *http.Response, filter string, header http.He
var layers []*internaljob.PreheatRequest
for _, v := range manifest.References() {
digest := v.Digest.String()
layers = append(layers, &internaljob.PreheatRequest{
layer := &internaljob.PreheatRequest{
URL: layerURL(image.protocol, image.domain, image.name, digest),
Tag: p.bizTag,
Filter: filter,
Digest: digest,
Headers: httputils.HeaderToMap(header),
})
}

logger.Infof("preheat layer: %+v", layer)
layers = append(layers, layer)
}

return layers, nil
Expand Down
6 changes: 6 additions & 0 deletions manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
ph.POST("", h.CreatePreheat)
ph.GET(":id", h.GetPreheat)

// Compatible with the V1 preheat.
pv1 := r.Group("preheats")
r.GET("/_ping", h.GetHealth)
pv1.POST("", h.CreateV1Preheat)
pv1.GET(":id", h.GetV1Preheat)

// Health Check
r.GET("/healthy/*action", h.GetHealth)

Expand Down
63 changes: 63 additions & 0 deletions manager/service/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,26 @@
package service

import (
"time"

"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"

machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
)

const (
// V1PreheatingStatusPending is the preheating is waiting for starting
V1PreheatingStatusPending = "WAITING"

// V1PreheatingStatusRunning is the preheating is running
V1PreheatingStatusRunning = "RUNNING"

// V1PreheatingStatusSuccess is the preheating is success
V1PreheatingStatusSuccess = "SUCCESS"

// V1PreheatingStatusFail is the preheating is failed
V1PreheatingStatusFail = "FAIL"
)

func (s *rest) CreatePreheat(json types.CreatePreheatRequest) (*types.Preheat, error) {
Expand Down Expand Up @@ -63,3 +81,48 @@ func (s *rest) CreatePreheat(json types.CreatePreheatRequest) (*types.Preheat, e
func (s *rest) GetPreheat(id string) (*types.Preheat, error) {
return s.job.GetPreheat(id)
}

func (s *rest) CreateV1Preheat(json types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) {
p, err := s.CreatePreheat(types.CreatePreheatRequest{
Type: json.Type,
URL: json.URL,
Filter: json.Filter,
Headers: json.Headers,
})
if err != nil {
return nil, err
}

return &types.CreateV1PreheatResponse{
ID: p.ID,
}, nil
}

func (s *rest) GetV1Preheat(id string) (*types.GetV1PreheatResponse, error) {
p, err := s.job.GetPreheat(id)
if err != nil {
return nil, err
}

return &types.GetV1PreheatResponse{
ID: p.ID,
Status: convertStatus(p.Status),
StartTime: p.CreatedAt.String(),
FinishTime: time.Now().String(),
}, nil
}

func convertStatus(status string) string {
switch status {
case machineryv1tasks.StatePending, machineryv1tasks.StateReceived, machineryv1tasks.StateRetry:
return V1PreheatingStatusPending
case machineryv1tasks.StateStarted:
return V1PreheatingStatusRunning
case machineryv1tasks.StateSuccess:
return V1PreheatingStatusSuccess
case machineryv1tasks.StateFailure:
return V1PreheatingStatusFail
}

return V1PreheatingStatusFail
}
2 changes: 2 additions & 0 deletions manager/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type REST interface {

CreatePreheat(types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(string) (*types.Preheat, error)
CreateV1Preheat(types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error)
GetV1Preheat(string) (*types.GetV1PreheatResponse, error)
}

type rest struct {
Expand Down
18 changes: 18 additions & 0 deletions manager/types/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,21 @@ type Preheat struct {
Status string `json:"status"`
CreatedAt time.Time `json:"create_at"`
}

type CreateV1PreheatRequest struct {
Type string `json:"type" binding:"required,oneof=image file"`
URL string `json:"url" binding:"required"`
Filter string `json:"filter" binding:"omitempty"`
Headers map[string]string `json:"headers" binding:"omitempty"`
}

type CreateV1PreheatResponse struct {
ID string `json:"ID"`
}

type GetV1PreheatResponse struct {
ID string `json:"ID"`
Status string `json:"status"`
StartTime string `json:"startTime,omitempty"`
FinishTime string `json:"finishTime,omitempty"`
}
9 changes: 6 additions & 3 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ func (t *job) preheat(req string) error {
}

// Generate range
if rg := request.Headers["Range"]; len(rg) > 0 {
meta.Range = rg
if request.Headers != nil {
if rg := request.Headers["Range"]; len(rg) > 0 {
meta.Range = rg
}
}

taskID := idgen.TaskID(request.URL, meta)
logger.Debugf("ready to preheat \"%s\", taskID = %s", request.URL, taskID)
logger.Infof("ready to preheat \"%s\", taskID = %s", request.URL, taskID)

task := supervisor.NewTask(taskID, request.URL, meta)
task = t.service.GetOrCreateTask(t.ctx, task)
Expand All @@ -174,6 +176,7 @@ func getPreheatResult(task *supervisor.Task) error {
case supervisor.TaskStatusSuccess:
return nil
default:
logger.Infof("preheat task %v status: %v", task.ID, task.GetStatus())
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/supervisor/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *cdn) receivePiece(ctx context.Context, task *Task, stream *client.Piece
if err != nil {
if err == io.EOF {
logger.Infof("task %s connection closed", task.ID)
if task.GetStatus() == TaskStatusSuccess {
if cdnPeer != nil && task.GetStatus() == TaskStatusSuccess {
span.SetAttributes(config.AttributePeerDownloadSuccess.Bool(true))
return cdnPeer, nil
}
Expand Down
8 changes: 8 additions & 0 deletions scheduler/supervisor/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ func (peer *Peer) GetChildren() *sync.Map {
}

func (peer *Peer) SetParent(parent *Peer) {
if parent == nil {
return
}

peer.parent.Store(parent)
}

Expand Down Expand Up @@ -457,6 +461,10 @@ func (peer *Peer) BindNewConn(stream scheduler.Scheduler_ReportPieceResultServer
}

func (peer *Peer) setConn(conn *Channel) {
if conn == nil {
return
}

peer.conn.Store(conn)
}

Expand Down

0 comments on commit 5857e52

Please sign in to comment.