Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to set piece length for preheat #3848

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// PreheatRequest defines the request parameters for preheating.
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
PieceLength *uint64 `json:"pieceLength" binding:"omitempty,gte=4194304"`
Tag string `json:"tag" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
files = []internaljob.PreheatRequest{
{
URL: json.URL,
PieceLength: json.PieceLength,
Tag: json.Tag,
FilteredQueryParams: json.FilteredQueryParams,
Headers: json.Headers,
Expand Down Expand Up @@ -349,6 +350,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
header.Set("Accept", v.MediaType)
layer := internaljob.PreheatRequest{
URL: image.blobsURL(v.Digest.String()),
PieceLength: args.PieceLength,
Tag: args.Tag,
FilteredQueryParams: args.FilteredQueryParams,
Headers: nethttp.HeaderToMap(header),
Expand Down
4 changes: 2 additions & 2 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{
Expand Down
18 changes: 18 additions & 0 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ type PreheatArgs struct {
// URL is the image url for preheating.
URL string `json:"url" binding:"required"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag for preheating.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -177,6 +183,12 @@ type GetTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -211,6 +223,12 @@ type DeleteTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down
10 changes: 8 additions & 2 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package idgen

import (
"strconv"
"strings"

commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
Expand Down Expand Up @@ -91,11 +92,16 @@ func ParseFilteredQueryParams(rawFilteredQueryParams string) []string {
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, tag, application string, filteredQueryParams []string) string {
func TaskIDV2(url string, pieceLength *uint64, tag, application string, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}

return pkgdigest.SHA256FromStrings(url, tag, application)
params := []string{url, tag, application}
if pieceLength != nil {
params = append(params, strconv.FormatUint(*pieceLength, 10))
}

return pkgdigest.SHA256FromStrings(params...)
}
25 changes: 24 additions & 1 deletion pkg/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ func TestTaskIDV1(t *testing.T) {
}

func TestTaskIDV2(t *testing.T) {
pieceLength := uint64(1024)

tests := []struct {
name string
url string
pieceLength *uint64
tag string
application string
filters []string
Expand All @@ -118,9 +121,20 @@ func TestTaskIDV2(t *testing.T) {
{
name: "generate taskID",
url: "https://example.com",
pieceLength: &pieceLength,
tag: "foo",
application: "bar",
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221")
},
},
{
name: "generate taskID with tag and application",
url: "https://example.com",
tag: "foo",
application: "bar",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d")
Expand All @@ -144,6 +158,15 @@ func TestTaskIDV2(t *testing.T) {
assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d")
},
},
{
name: "generate taskID with pieceLength",
url: "https://example.com",
pieceLength: &pieceLength,
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38")
},
},
{
name: "generate taskID with filters",
url: "https://example.com?foo=foo&bar=bar",
Expand All @@ -157,7 +180,7 @@ func TestTaskIDV2(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.pieceLength, tc.tag, tc.application, tc.filters))
})
}
}
7 changes: 5 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
return "", err
}

taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
taskID := idgen.TaskIDV2(req.URL, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat %s request: %#v", req.URL, req)
log.Infof("preheat %s %d request: %#v", req.URL, req.PieceLength, req)

ctx, cancel := context.WithTimeout(ctx, req.Timeout)
defer cancel()
Expand Down Expand Up @@ -299,6 +299,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -440,6 +441,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -583,6 +585,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/standard/host_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestHostManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/standard/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func TestHost_LoadPeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestHost_StorePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(tc.peerID, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -711,7 +711,7 @@ func TestHost_DeletePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -763,7 +763,7 @@ func TestHost_LeavePeers(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

tc.expect(t, host, mockPeer)
Expand Down Expand Up @@ -815,7 +815,7 @@ func TestHost_FreeUploadCount(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, host)

tc.expect(t, host, mockTask, mockPeer)
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/standard/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestPeerManager_Load(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestPeerManager_Store(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestPeerManager_LoadOrStore(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestPeerManager_Delete(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestPeerManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(tc.gcConfig, gc)
if err != nil {
Expand Down
Loading
Loading