Skip to content

Commit

Permalink
feat: support to set piece length for preheat
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Feb 24, 2025
1 parent 6f58ed0 commit cb73ffb
Show file tree
Hide file tree
Showing 24 changed files with 278 additions and 209 deletions.
1 change: 1 addition & 0 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// PreheatRequest defines the request parameters for preheating.
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
PieceLength *uint64 `json:"pieceLength" binding:"omitempty,gte=4194304"`
Tag string `json:"tag" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
files = []internaljob.PreheatRequest{
{
URL: json.URL,
PieceLength: json.PieceLength,
Tag: json.Tag,
FilteredQueryParams: json.FilteredQueryParams,
Headers: json.Headers,
Expand Down Expand Up @@ -349,6 +350,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
header.Set("Accept", v.MediaType)
layer := internaljob.PreheatRequest{
URL: image.blobsURL(v.Digest.String()),
PieceLength: args.PieceLength,
Tag: args.Tag,
FilteredQueryParams: args.FilteredQueryParams,
Headers: nethttp.HeaderToMap(header),
Expand Down
4 changes: 2 additions & 2 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{
Expand Down
18 changes: 18 additions & 0 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ type PreheatArgs struct {
// URL is the image url for preheating.
URL string `json:"url" binding:"required"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag for preheating.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -177,6 +183,12 @@ type GetTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -211,6 +223,12 @@ type DeleteTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down
10 changes: 8 additions & 2 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package idgen

import (
"strconv"
"strings"

commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
Expand Down Expand Up @@ -91,11 +92,16 @@ func ParseFilteredQueryParams(rawFilteredQueryParams string) []string {
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, tag, application string, filteredQueryParams []string) string {
func TaskIDV2(url string, pieceLength *uint64, tag, application string, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}

return pkgdigest.SHA256FromStrings(url, tag, application)
params := []string{url, tag, application}
if pieceLength != nil {
params = append(params, strconv.FormatUint(*pieceLength, 10))
}

return pkgdigest.SHA256FromStrings(params...)
}
25 changes: 24 additions & 1 deletion pkg/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ func TestTaskIDV1(t *testing.T) {
}

func TestTaskIDV2(t *testing.T) {
pieceLength := uint64(1024)

tests := []struct {
name string
url string
pieceLength *uint64
tag string
application string
filters []string
Expand All @@ -118,9 +121,20 @@ func TestTaskIDV2(t *testing.T) {
{
name: "generate taskID",
url: "https://example.com",
pieceLength: &pieceLength,
tag: "foo",
application: "bar",
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221")
},
},
{
name: "generate taskID with tag and application",
url: "https://example.com",
tag: "foo",
application: "bar",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d")
Expand All @@ -144,6 +158,15 @@ func TestTaskIDV2(t *testing.T) {
assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d")
},
},
{
name: "generate taskID with pieceLength",
url: "https://example.com",
pieceLength: &pieceLength,
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38")
},
},
{
name: "generate taskID with filters",
url: "https://example.com?foo=foo&bar=bar",
Expand All @@ -157,7 +180,7 @@ func TestTaskIDV2(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.pieceLength, tc.tag, tc.application, tc.filters))
})
}
}
7 changes: 5 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
return "", err
}

taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
taskID := idgen.TaskIDV2(req.URL, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat %s request: %#v", req.URL, req)
log.Infof("preheat %s %d request: %#v", req.URL, req.PieceLength, req)

ctx, cancel := context.WithTimeout(ctx, req.Timeout)
defer cancel()
Expand Down Expand Up @@ -299,6 +299,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -440,6 +441,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -583,6 +585,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/standard/host_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestHostManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/standard/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func TestHost_LoadPeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestHost_StorePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(tc.peerID, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -711,7 +711,7 @@ func TestHost_DeletePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -763,7 +763,7 @@ func TestHost_LeavePeers(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

tc.expect(t, host, mockPeer)
Expand Down Expand Up @@ -815,7 +815,7 @@ func TestHost_FreeUploadCount(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

tc.expect(t, host, mockTask, mockPeer)
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/standard/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestPeerManager_Load(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestPeerManager_Store(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestPeerManager_LoadOrStore(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestPeerManager_Delete(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestPeerManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(tc.gcConfig, gc)
if err != nil {
Expand Down
Loading

0 comments on commit cb73ffb

Please sign in to comment.